package db import ( "bytes" "database/sql" "fmt" "log" "strings" "sync" "time" "github.com/blkchain/blkchain" "github.com/lib/pq" ) // Explanation of how we handle integers. In Bitcoin structures most // integers are uint32. Postgres does not have an unsigned int type, // but using a bigint to store integers seems like a waste of // space. So we cast all uints to int32, and thus 0xFFFFFFFF would // become -1 in Postgres, which is fine as long as we know all the // bits are correct. var ( writerWg sync.WaitGroup errors bool ) type PGWriter struct { blockCh chan *BlockRec wg *sync.WaitGroup db *sql.DB start time.Time zfsDataset string parallel int } type isUTXOer interface { IsUTXO(blkchain.Uint256, uint32) (bool, error) } func NewPGWriter(connstr string, cacheSize int, utxo isUTXOer, zfsDataset string, parallel int) (*PGWriter, error) { start := time.Now() var ( wg sync.WaitGroup firstImport = true db *sql.DB err error ) if connstr != "nulldb" { db, err = sql.Open("postgres", connstr) if err != nil { return nil, err } if err := createPgcrypto(db); err != nil { return nil, err } if err := createTables(db); err != nil { if strings.Contains(err.Error(), "already exists") { // this is fine, cancel deferred index/constraint creation firstImport = false log.Printf("Tables already exist, catching up.") } else { return nil, err } } if firstImport { if utxo == nil { return nil, fmt.Errorf("First import must be done with UTXO checker, i.e. from LevelDb directly. (utxo == nil)") } log.Printf("Tables created without indexes, which are created at the very end.") log.Printf("Setting table parameters: autovacuum_enabled=false") if err := setTableStorageParams(db); err != nil { return nil, err } } if err := createPrevoutMissTable(db); err != nil { return nil, err } } bch := make(chan *BlockRec, 2) wg.Add(1) w := &PGWriter{ blockCh: bch, wg: &wg, db: db, start: start, zfsDataset: zfsDataset, parallel: parallel, } go w.pgBlockWorker(bch, &wg, firstImport, cacheSize, utxo) return w, nil } func (p *PGWriter) Close() { close(p.blockCh) p.wg.Wait() } func (p *PGWriter) Uptime() time.Duration { return time.Now().Sub(p.start) } func (p *PGWriter) WriteBlock(b *BlockRec, sync bool) error { if sync { b.sync = make(chan bool) } p.blockCh <- b if sync { if ok := <-b.sync; !ok { log.Printf("Error writing block: %v", b.Block.Hash()) return fmt.Errorf("Error writing block: %v", b.Block.Hash()) } } return nil } func (w *PGWriter) HeightAndHashes(back int) (map[int][]blkchain.Uint256, error) { return getHeightAndHashes(w.db, back) } func (w *PGWriter) pgBlockWorker(ch <-chan *BlockRec, wg *sync.WaitGroup, firstImport bool, cacheSize int, utxo isUTXOer) { defer wg.Done() bid, err := getLastBlockId(w.db) if err != nil { log.Printf("Error getting last block id, exiting: %v", err) return } txid, err := getLastTxId(w.db) if err != nil { log.Printf("Error getting last tx id, exiting: %v", err) return } errCh := make(chan error, 1) go func() { err := <-errCh log.Printf("ERROR (all commits disabled now): %v", err) errors = true }() blockCh := make(chan *BlockRec, 2) go pgBlockWriter(blockCh, errCh, w.db) txCh := make(chan *txRec, 64) go pgTxWriter(txCh, errCh, w.db) txInCh := make(chan *txInRec, 64) go pgTxInWriter(txInCh, errCh, w.db, firstImport) txOutCh := make(chan *txOutRec, 64) go pgTxOutWriter(txOutCh, errCh, w.db, utxo) writerWg.Add(4) hashes, err := getHeightAndHashes(w.db, 1) if err != nil { log.Printf("Error getting last hash and height, exiting: %v", err) return } // nil utxo means this is coming from a btcnode, we do not need to skip blocks if utxo != nil && len(hashes) > 0 { var bhash blkchain.Uint256 for _, hh := range hashes { bhash = hh[len(hh)-1] // last hash in the list is the last hash } log.Printf("PGWriter ignoring blocks up to hash %v", bhash) skip, last := 0, time.Now() for b := range ch { hash := b.Block.Hash() if bhash == hash { break } else { skip++ if skip%10 == 0 && time.Now().Sub(last) > 5*time.Second { log.Printf(" - ignored %d blocks...", skip) last = time.Now() } } } if skip > 0 { log.Printf("Ignored %d total blocks.", skip) } } idCache := newTxIdCache(cacheSize) var syncCh chan bool if !firstImport { // no firstImport means that the constraints already // exist, and we need to wait for a tx to be commited before // ins/outs can be inserted. Same with block/tx. syncCh = make(chan bool) // The cache must be warmed up in (a rare) case when we encounter a chain split // soon after we start to avoid duplicate txid key errors nBlocks := 6 log.Printf("Warming up the txid cache going back %d blocks...", nBlocks) if err := warmupCache(w.db, idCache, nBlocks); err != nil { log.Printf("Error warming up the idCache: %v", err) } log.Printf("Warming up the cache done.") } txcnt, start, lastStatus, lastCacheStatus, lastHeight := 0, time.Now(), time.Now(), 0, -1 blkCnt, blkSz, batchSz := 0, 0, 0 for br := range ch { bid++ blkCnt++ br.Id = bid br.Hash = br.Block.Hash() if br.Height < 0 { // We have to look it up hashes, err := getHeightAndHashes(w.db, 5) if err != nil { log.Printf("pgBlockWorker() error: %v", err) } hloop: for height, hh := range hashes { for _, h := range hh { if h == br.PrevHash { br.Height = height + 1 break hloop } } } if br.Height < 0 { log.Printf("pgBlockWorker: Could not connect block to a previous block on our chain, ignoring it.") if br.sync != nil { br.sync <- false } continue } } lastHeight = br.Height blkSz += br.Size() batchSz += br.Size() blockCh <- br for n, tx := range br.Txs { txid++ txcnt++ hash := tx.Hash() // Check if recently seen and add to cache. recentId := idCache.add(hash, txid, len(tx.TxOuts)) txCh <- &txRec{ id: recentId, n: n, blockId: bid, tx: tx, hash: hash, dupe: recentId != txid, // dupes are not written but referred to in block_txs } if recentId != txid { // This is a recent transaction, nothing to do continue } for n, txin := range tx.TxIns { txInCh <- &txInRec{ txId: txid, n: n, txIn: txin, idCache: idCache, } } for n, txout := range tx.TxOuts { txOutCh <- &txOutRec{ txId: txid, n: n, txOut: txout, hash: hash, } } if errors { log.Printf("Errors detected, aborting.") break } } if errors { if br.sync != nil { br.sync <- false } break } if !firstImport { // commit after every block blockCh <- &BlockRec{ sync: syncCh, } if syncCh != nil { <-syncCh } txCh <- &txRec{ sync: syncCh, } if syncCh != nil { <-syncCh } // NB: Outputs must be commited before inputs! txOutCh <- &txOutRec{ sync: syncCh, } if syncCh != nil { <-syncCh } if br.sync != nil { // wait for it to finish txInCh <- &txInRec{ sync: syncCh, } if syncCh != nil { <-syncCh br.sync <- true } } else { // we don't care when it finishes txInCh <- nil } } else if bid%1024 == 0 { // commit every N blocks if batchSz > 100_000_000 { // but not less than a specific batch size batchSz = 0 blockCh <- nil txCh <- nil txInCh <- nil txOutCh <- nil } } // report progress if time.Now().Sub(lastStatus) > 5*time.Second { uptime := w.Uptime().Round(time.Second) log.Printf("Height: %d Txs: %d Time: %v Tx/s: %02f KB/s: %02f Runtime: %s", br.Height, txcnt, time.Unix(int64(br.Time), 0), float64(txcnt)/time.Now().Sub(start).Seconds(), float64(blkSz/1024)/time.Now().Sub(start).Seconds(), uptime) lastStatus = time.Now() lastCacheStatus++ if lastCacheStatus == 5 { idCache.reportStats(false) lastCacheStatus = 0 } } } close(blockCh) close(txInCh) close(txOutCh) close(txCh) log.Printf("Closed db channels, waiting for workers to finish...") writerWg.Wait() log.Printf("Workers finished.") if blkCnt == 0 { return } idCache.reportStats(true) // Final stats ane last time if errors || w.db == nil { return } verbose := firstImport if firstImport { idCache.clear() log.Printf("Cleared the cache.") // First only indexes necessary for fixPrevoutTxId log.Printf("Creating indexes part 1, please be patient, this may take a long time...") if err := createIndexes1(w.db, verbose); err != nil { log.Printf("Error creating indexes: %v", err) } if len(w.zfsDataset) > 0 { takeSnapshot(w.db, w.zfsDataset, lastHeight, "-preindex") } if idCache.miss > 0 { log.Printf("Running ANALYZE txins, _prevout_miss, txs to ensure the next step selects the optimal plan...") start := time.Now() if err := fixPrevoutTxIdAnalyze(w.db); err != nil { log.Printf("Error running ANALYZE: %v", err) } log.Printf("...done in %s. Fixing missing prevout_tx_id entries (if needed), this may take a long time..", time.Now().Sub(start).Round(time.Millisecond)) start = time.Now() if err := fixPrevoutTxId(w.db, w.parallel, true); err != nil { log.Printf("Error fixing prevout_tx_id: %v", err) } log.Printf("...done in %s.", time.Now().Sub(start).Round(time.Millisecond)) } else { log.Printf("NOT fixing missing prevout_tx_id entries because there were 0 cache misses.") } // Now the rest of the indexes log.Printf("Creating indexes part 2, please be patient, this may take a long time...") if err := createIndexes2(w.db, verbose); err != nil { log.Printf("Error creating indexes: %v", err) } // Finally constraints log.Printf("Creating constraints (if needed), please be patient, this may take a long time...") if err := createConstraints(w.db, verbose); err != nil { log.Printf("Error creating constraints: %v", err) } // NOTE: It is imperative that this trigger is created *after* the fixPrevoutTxId() call, or else these // triggers will be needlessly triggered slowing fixPrevoutTxId() tremendously. The trigger sets the spent // column, which should anyway be correctly set during the initial import based on the LevelDb UTXO set. log.Printf("Creating txins triggers.") if err := createTxinsTriggers(w.db); err != nil { log.Printf("Error creating txins triggers: %v", err) } } log.Printf("Dropping _prevout_miss table.") if err := dropPrevoutMissTable(w.db); err != nil { log.Printf("Error dropping _prevout_miss table: %v", err) } orphanLimit, start := 0, time.Now() if !firstImport { // No need to walk back the entire chain orphanLimit = blkCnt + 50 log.Printf("Marking orphan blocks (going back %d blocks)...", orphanLimit) } else { log.Printf("Marking orphan blocks (whole chain)...") } if err := w.SetOrphans(orphanLimit); err != nil { log.Printf("Error marking orphans: %v", err) } log.Printf("Done marking orphan blocks in %s.", time.Now().Sub(start).Round(time.Millisecond)) if firstImport { if err := resetTableStorageParams(w.db); err != nil { log.Printf("Error resetting storage parameters: %v", err) } log.Printf("Reset table storage parameters: autovacuum_enabled.") log.Printf("Indexes and constraints created.") if len(w.zfsDataset) > 0 { takeSnapshot(w.db, w.zfsDataset, lastHeight, "-postindex") } } } func pgBlockWriter(c chan *BlockRec, errCh chan<- error, db *sql.DB) { defer writerWg.Done() cols := []string{"id", "height", "hash", "version", "prevhash", "merkleroot", "time", "bits", "nonce", "orphan", "size", "base_size", "weight", "virt_size"} txn, stmt, err := begin(db, "blocks", cols) if err != nil { errCh <- fmt.Errorf("pgBlockWriter1: %v", err) return } for br := range c { if br == nil || br.Block == nil { // commit signal if err := commit(stmt, txn, nil); err != nil { errCh <- fmt.Errorf("BLock commit error: %v", err) } txn, stmt, err = begin(db, "blocks", cols) if err != nil { errCh <- fmt.Errorf("pgBlockWriter2: %v", err) } if br != nil && br.sync != nil { br.sync <- true } continue } if stmt != nil { b := br.Block if _, err := stmt.Exec( br.Id, br.Height, br.Hash[:], int32(b.Version), b.PrevHash[:], b.HashMerkleRoot[:], int32(b.Time), int32(b.Bits), int32(b.Nonce), br.Orphan, br.Size(), br.BaseSize(), br.Weight(), br.VirtualSize(), ); err != nil { errCh <- fmt.Errorf("pgBlockWriter3: %v", err) return } } } log.Printf("Block writer channel closed, commiting transaction.") if err = commit(stmt, txn, nil); err != nil { errCh <- fmt.Errorf("pgBlockWriter4: %v", err) } log.Printf("Block writer done.") } func pgTxWriter(c chan *txRec, errCh chan<- error, db *sql.DB) { defer writerWg.Done() cols := []string{"id", "txid", "version", "locktime", "size", "base_size", "weight", "virt_size"} bcols := []string{"block_id", "n", "tx_id"} txn, stmt, err := begin(db, "txs", cols) if err != nil { errCh <- fmt.Errorf("pgTxWriter1: %v", err) return } btxn, bstmt, err := begin(db, "block_txs", bcols) if err != nil { errCh <- fmt.Errorf("pgTxWriter2: %v", err) return } // It's not clear that parallelizing Txs provides any speed // advntage, but whatever. nWorkers := 3 wch := make(chan *txRec, 4) var wg sync.WaitGroup worker := func() { wg.Add(1) defer wg.Done() for tr := range wch { if !tr.dupe { if stmt != nil { t := tr.tx if _, err := stmt.Exec( tr.id, tr.hash[:], int32(t.Version), int32(t.LockTime), t.Size(), t.BaseSize(), t.Weight(), t.VirtualSize(), ); err != nil { errCh <- fmt.Errorf("pgTxWriter3: %v", err) } } // It can still be a dupe if we are catching up and the // cache is empty, which is why we warmupCache. } if bstmt != nil { if _, err := bstmt.Exec( tr.blockId, tr.n, tr.id, ); err != nil { errCh <- fmt.Errorf("pgTxWriter4: %v", err) } } } } // initital start workers for i := 0; i < nWorkers; i++ { go worker() } for tr := range c { if tr == nil || tr.tx == nil { // commit signal // close channel, wait for workers to finish close(wch) wg.Wait() // commit if err := commit(stmt, txn, nil); err != nil { errCh <- fmt.Errorf("pgTxWriter5: %v", err) } if err = commit(bstmt, btxn, nil); err != nil { errCh <- fmt.Errorf("pgTxWriter6: %v", err) } var err error txn, stmt, err = begin(db, "txs", cols) if err != nil { errCh <- fmt.Errorf("pgTxWriter7: %v", err) } btxn, bstmt, err = begin(db, "block_txs", bcols) if err != nil { errCh <- fmt.Errorf("pgTxWriter8: %v", err) } if tr != nil && tr.sync != nil { tr.sync <- true } // make a new channel, restart workers wch = make(chan *txRec, 4) for i := 0; i < nWorkers; i++ { go worker() } continue } wch <- tr } close(wch) wg.Wait() log.Printf("Tx writer channel closed, committing transaction.") if err := commit(stmt, txn, nil); err != nil { errCh <- fmt.Errorf("pgTxWriter9: %v", err) } if err := commit(bstmt, btxn, nil); err != nil { errCh <- fmt.Errorf("pgTxWriter10: %v", err) } log.Printf("Tx writer done.") } func pgTxInWriter(c chan *txInRec, errCh chan<- error, db *sql.DB, firstImport bool) { defer writerWg.Done() cols := []string{"tx_id", "n", "prevout_tx_id", "prevout_n", "scriptsig", "sequence", "witness"} txn, stmt, err := begin(db, "txins", cols) if err != nil { errCh <- fmt.Errorf("pgTxInWriter1: %v", err) return } misses := make([]*prevoutMiss, 0, 2000) // Parallelizing txins provides a small speed up, especially // during first import when we write misses to the db directly. We // also have a dedicated worker for prevout misses during // firstImport. nWorkers := 8 wch := make(chan *txInRec, 4) mch := make(chan *prevoutMiss, 64) var ( wg sync.WaitGroup mg sync.WaitGroup ) missWriter := func() { // prevoutMiss recorder mg.Add(1) defer mg.Done() for miss := range mch { // write it to the DB directly if stmt != nil { if err := recordPrevoutMiss(db, miss.txId, miss.n, miss.prevOutHash); err != nil { errCh <- fmt.Errorf("pgTxInWriter2: %v", err) } } } } worker := func() { wg.Add(1) defer wg.Done() for tr := range wch { t := tr.txIn var wb interface{} if t.Witness != nil { var b bytes.Buffer blkchain.BinWrite(&t.Witness, &b) wb = b.Bytes() } var prevOutTxId *int64 = nil if t.PrevOut.N != 0xffffffff { // coinbase prevOutTxId = tr.idCache.check(t.PrevOut.Hash) if prevOutTxId == nil { // cache miss if firstImport { mch <- &prevoutMiss{tr.txId, tr.n, t.PrevOut.Hash} } else { // remember it, it will be written just when needed misses = append(misses, &prevoutMiss{tr.txId, tr.n, t.PrevOut.Hash}) } } } if stmt != nil { if _, err := stmt.Exec( tr.txId, tr.n, prevOutTxId, int32(t.PrevOut.N), t.ScriptSig, int32(t.Sequence), wb, ); err != nil { errCh <- fmt.Errorf("pgTxInWriter3: %v", err) } } } } // initital start workers go missWriter() for i := 0; i < nWorkers; i++ { go worker() } for tr := range c { if tr == nil || tr.txIn == nil { // commit signal // close channel, wait for workers to finish close(wch) wg.Wait() close(mch) mg.Wait() // commit if err := commit(stmt, txn, misses); err != nil { errCh <- fmt.Errorf("pgTxInWriter4: %v", err) } misses = misses[:0] var err error txn, stmt, err = begin(db, "txins", cols) if err != nil { errCh <- fmt.Errorf("pgTxInWriter5: %v", err) } if tr != nil && tr.sync != nil { tr.sync <- true } // make a new channel, restart workers mch = make(chan *prevoutMiss, 64) go missWriter() wch = make(chan *txInRec, 4) for i := 0; i < nWorkers; i++ { go worker() } continue } wch <- tr } close(wch) wg.Wait() close(mch) mg.Wait() log.Printf("TxIn writer channel closed, committing transaction.") if err := commit(stmt, txn, misses); err != nil { errCh <- fmt.Errorf("pgTxInWriter6: %v", err) } log.Printf("TxIn writer done.") } func pgTxOutWriter(c chan *txOutRec, errCh chan<- error, db *sql.DB, utxo isUTXOer) { defer writerWg.Done() cols := []string{"tx_id", "n", "value", "scriptpubkey", "spent"} txn, stmt, err := begin(db, "txouts", cols) if err != nil { errCh <- fmt.Errorf("pgTxOutWriter1: %v", err) return } // The call to IsUTXO() is very time consuming because it involves // LevelDb, parallelizing here provides a huge speed up. nWorkers := 8 wch := make(chan *txOutRec, 4) var wg sync.WaitGroup worker := func() { wg.Add(1) defer wg.Done() for tr := range wch { var spent bool if utxo != nil { // NB: Some early unspent coins are not in the UTXO set, // probably because they are not spendable? So the spent // flag could also be interpeted as unspendable. isUTXO, err := utxo.IsUTXO(tr.hash, uint32(tr.n)) if err != nil { errCh <- fmt.Errorf("pgTxOutWriter2: %v", err) } spent = !isUTXO } if stmt != nil { t := tr.txOut if _, err := stmt.Exec( tr.txId, tr.n, t.Value, t.ScriptPubKey, spent, ); err != nil { errCh <- fmt.Errorf("pgTxOutWriter3: %v", err) } } } } // initital start workers for i := 0; i < nWorkers; i++ { go worker() } for tr := range c { if tr == nil || tr.txOut == nil { // commit signal // close channel, wait for workers to finish close(wch) wg.Wait() // commit if err := commit(stmt, txn, nil); err != nil { errCh <- fmt.Errorf("pgTxOutWriter4: %v", err) } var err error txn, stmt, err = begin(db, "txouts", cols) if err != nil { errCh <- fmt.Errorf("pgTxOutWriter5: %v", err) } if tr != nil && tr.sync != nil { tr.sync <- true } // make a new channel, restart workers wch = make(chan *txOutRec, 4) for i := 0; i < nWorkers; i++ { go worker() } continue } wch <- tr } close(wch) wg.Wait() log.Printf("TxOut writer channel closed, committing transaction.") if err := commit(stmt, txn, nil); err != nil { errCh <- fmt.Errorf("pgTxOutWriter6: %v", err) } log.Printf("TxOut writer done.") } func begin(db *sql.DB, table string, cols []string) (*sql.Tx, *sql.Stmt, error) { if db == nil { return nil, nil, nil } txn, err := db.Begin() if err != nil { return nil, nil, err } if _, err := txn.Exec("SET CONSTRAINTS ALL DEFERRED"); err != nil { return nil, nil, err } stmt, err := txn.Prepare(pq.CopyIn(table, cols...)) if err != nil { return nil, nil, err } return txn, stmt, nil } func commit(stmt *sql.Stmt, txn *sql.Tx, misses []*prevoutMiss) (err error) { if stmt == nil { return nil } if _, err = stmt.Exec(); err != nil { return err } if err = stmt.Close(); err != nil { return err } // We do this here because it is not possible to use Exec() during // a pq.CopyIn operation, but we do want this done prior to the // Commit. Note that misses is always empty during first import, // which writes misses to the db directly, bypassing misses slice. if len(misses) > 0 { if err = recordPrevoutMisses(txn, misses); err != nil { return err } // TODO: Make the parallel argument below configurable? if err = fixPrevoutTxId(txn, 4, false); err != nil { // also truncates _prevout_miss return err } } if errors { log.Printf("Not committing transaction because of prior errors.") return fmt.Errorf("Not committing transaction because of prior errors.") } err = txn.Commit() if err != nil { return err } return nil } func getHeightAndHashes(db *sql.DB, back int) (map[int][]blkchain.Uint256, error) { if db == nil { return nil, nil } stmt := `SELECT height, hash FROM blocks WHERE height > (SELECT MAX(height) FROM blocks) - $1` rows, err := db.Query(stmt, back) if err != nil { return nil, fmt.Errorf("getHeightAndHashes: %v", err) } defer rows.Close() hashes := make(map[int][]blkchain.Uint256) for rows.Next() { var ( height int hash blkchain.Uint256 ) if err := rows.Scan(&height, &hash); err != nil { return nil, err } if list, ok := hashes[height]; ok { hashes[height] = append(list, hash) } else { hashes[height] = []blkchain.Uint256{hash} } } if len(hashes) > 0 { return hashes, nil } return nil, rows.Err() } func getLastBlockId(db *sql.DB) (int, error) { if db == nil { return 0, nil } rows, err := db.Query("SELECT id FROM blocks ORDER BY id DESC LIMIT 1") if err != nil { return 0, err } defer rows.Close() if rows.Next() { var id int if err := rows.Scan(&id); err != nil { return 0, err } return id, nil } return 0, rows.Err() } func getLastTxId(db *sql.DB) (int64, error) { if db == nil { return 0, nil } rows, err := db.Query("SELECT id FROM txs ORDER BY id DESC LIMIT 1") if err != nil { return 0, err } defer rows.Close() if rows.Next() { var id int64 if err := rows.Scan(&id); err != nil { return 0, err } return id, nil } return 0, rows.Err() } // This could be a db connection or a transaction type execer interface { Exec(query string, args ...interface{}) (sql.Result, error) Query(query string, args ...any) (*sql.Rows, error) } func createPrevoutMissTable(db execer) error { sql := ` CREATE UNLOGGED TABLE IF NOT EXISTS _prevout_miss ( id SERIAL NOT NULL PRIMARY KEY ,tx_id BIGINT NOT NULL ,n INT NOT NULL ,prevout_hash BYTEA NOT NULL ); TRUNCATE _prevout_miss RESTART IDENTITY; ` _, err := db.Exec(sql) return err } func clearPrevoutMissTable(db execer) error { _, err := db.Exec("TRUNCATE _prevout_miss RESTART IDENTITY") return err } func dropPrevoutMissTable(db *sql.DB) error { _, err := db.Exec("DROP TABLE IF EXISTS _prevout_miss") return err } func prevoutMissMaxId(db execer) (int, error) { stmt := `SELECT MAX(id) FROM _prevout_miss` rows, err := db.Query(stmt) if err != nil { return 0, err } defer rows.Close() var max int for rows.Next() { if err := rows.Scan(&max); err != nil { return 0, err } } return max, nil } func recordPrevoutMiss(db *sql.DB, txId int64, n int, prevoutHash blkchain.Uint256) error { stmt := "INSERT INTO _prevout_miss (tx_id, n, prevout_hash) VALUES($1, $2, $3)" _, err := db.Exec(stmt, txId, n, prevoutHash[:]) if err != nil { return err } return nil } type prevoutMiss struct { txId int64 n int prevOutHash blkchain.Uint256 } func recordPrevoutMisses(txn *sql.Tx, misses []*prevoutMiss) error { stmt, err := txn.Prepare(pq.CopyIn("_prevout_miss", "tx_id", "n", "prevout_hash")) if err != nil { return err } for i, m := range misses { if _, err = stmt.Exec(m.txId, m.n, m.prevOutHash[:]); err != nil { return err } misses[i] = nil // so that they can be freed } if _, err = stmt.Exec(); err != nil { return err } if err = stmt.Close(); err != nil { return err } return nil } // Most of the prevout_tx_id's should be already set during the // import, but we need to correct the remaining ones. You can likely // avoid it by increasing the txIdCache size. After the first import // it will be looked up on the fly as needed, but a larger cache is // still important. We run it in parallel to speed things up. func fixPrevoutTxId(db execer, parallel int, verbose bool) error { if parallel == 0 { parallel = 1 } stmt := ` UPDATE txins i SET prevout_tx_id = t.id FROM _prevout_miss m JOIN txs t ON m.prevout_hash = t.txid WHERE m.id >= $1 AND m.id < $2 AND i.tx_id = m.tx_id AND i.n = m.n` max, err := prevoutMissMaxId(db) if err != nil { return err } if verbose { log.Printf(" max prevoutMiss id: %d parallel: %d", max, parallel) } if max == 0 { return nil // nothing to do } step := max / (parallel - 1) // -1 because we do not one left over if step > 10000 { step = 10000 // kinda arbitrary - the idea is to show progress } if step < 10 { step = 10 } // Start workers type startEnd struct{ start, end int } queue := make(chan startEnd) var wg sync.WaitGroup for i := 0; i < parallel; i++ { go func(queue chan startEnd) { wg.Add(1) defer wg.Done() for se := range queue { if verbose { log.Printf(" processing range [%d, %d) of %d...", se.start, se.end, max) } if _, err := db.Exec(stmt, se.start, se.end); err != nil { log.Printf(" range [%d, %d) error: %v", se.start, se.end, err) } } }(queue) } for i := 1; i <= max; i += step { queue <- startEnd{i, i + step} } close(queue) wg.Wait() return clearPrevoutMissTable(db) } func fixPrevoutTxIdAnalyze(db execer) error { _, err := db.Exec(`ANALYZE txins, _prevout_miss, txs`) return err } func createTables(db *sql.DB) error { sqlTables := ` CREATE TABLE blocks ( id INT NOT NULL ,height INT NOT NULL -- not same as id, because orphans. ,hash BYTEA NOT NULL ,version INT NOT NULL ,prevhash BYTEA NOT NULL ,merkleroot BYTEA NOT NULL ,time INT NOT NULL ,bits INT NOT NULL ,nonce INT NOT NULL ,orphan BOOLEAN NOT NULL DEFAULT false ,size INT NOT NULL ,base_size INT NOT NULL ,weight INT NOT NULL ,virt_size INT NOT NULL ); CREATE TABLE txs ( id BIGINT NOT NULL ,txid BYTEA NOT NULL ,version INT NOT NULL ,locktime INT NOT NULL ,size INT NOT NULL ,base_size INT NOT NULL ,weight INT NOT NULL ,virt_size INT NOT NULL ); CREATE TABLE block_txs ( block_id INT NOT NULL ,n SMALLINT NOT NULL ,tx_id BIGINT NOT NULL ); CREATE TABLE txins ( tx_id BIGINT NOT NULL ,n SMALLINT NOT NULL ,prevout_tx_id BIGINT -- can be NULL for coinbase ,prevout_n SMALLINT NOT NULL ,scriptsig BYTEA NOT NULL ,sequence INT NOT NULL ,witness BYTEA ); CREATE TABLE txouts ( tx_id BIGINT NOT NULL ,n SMALLINT NOT NULL ,value BIGINT NOT NULL ,scriptpubkey BYTEA NOT NULL ,spent BOOL NOT NULL ); ` _, err := db.Exec(sqlTables) return err } // The approach we are taking here is to disable autovacuum during the // first import, then enabling it at the end and let it run. Because of hint // bits, the vacuum process will effectively rewrite every page of the table, this is explained here // https://dba.stackexchange.com/questions/130496/is-it-worth-it-to-run-vacuum-on-a-table-that-only-receives-inserts // NB: Setting UNLOGGED/LOGGED does not work as changing this setting rewrites the tables func setTableStorageParams(db *sql.DB) error { for _, table := range []string{"blocks", "txs", "block_txs", "txins", "txouts"} { stmt := fmt.Sprintf(`ALTER TABLE %s SET (autovacuum_enabled=false)`, table) if _, err := db.Exec(stmt); err != nil { return err } } return nil } func resetTableStorageParams(db *sql.DB) error { for _, table := range []string{"blocks", "txs", "block_txs", "txins", "txouts"} { stmt := fmt.Sprintf(`ALTER TABLE %s RESET (autovacuum_enabled)`, table) if _, err := db.Exec(stmt); err != nil { return err } } return nil } func createPgcrypto(db *sql.DB) error { _, err := db.Exec("CREATE EXTENSION IF NOT EXISTS pgcrypto") return err } // Create indexes only necessary to run fixPrevoutTxId(). Since we are // updating txins, a presense of the txins(addr_prefix(... index would // slow the updates down a great deal (updates cause an index update, // which calls the function). func createIndexes1(db *sql.DB, verbose bool) error { var start time.Time if verbose { log.Printf(" Starting txins primary key...") } start = time.Now() if _, err := db.Exec(` DO $$ BEGIN IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage WHERE table_name = 'txins' AND constraint_name = 'txins_pkey') THEN ALTER TABLE txins ADD CONSTRAINT txins_pkey PRIMARY KEY(tx_id, n); END IF; END $$;`); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting txs txid (hash) index...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS txs_txid_idx ON txs(txid);"); err != nil { return err } if verbose { log.Printf(" ...done in %s.", time.Now().Sub(start).Round(time.Millisecond)) } return nil } func createIndexes2(db *sql.DB, verbose bool) error { var start time.Time // Adding a constraint or index if it does not exist is a little tricky in PG if verbose { log.Printf(" Starting blocks primary key...") } start = time.Now() if _, err := db.Exec(` DO $$ BEGIN IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage WHERE table_name = 'blocks' AND constraint_name = 'blocks_pkey') THEN ALTER TABLE blocks ADD CONSTRAINT blocks_pkey PRIMARY KEY(id); END IF; END $$;`); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting blocks prevhash index...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec("CREATE INDEX IF NOT EXISTS blocks_prevhash_idx ON blocks(prevhash);"); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting blocks hash index...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS blocks_hash_idx ON blocks(hash);"); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting blocks height index...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec("CREATE INDEX IF NOT EXISTS blocks_height_idx ON blocks(height);"); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting txs primary key...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec(` DO $$ BEGIN IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage WHERE table_name = 'txs' AND constraint_name = 'txs_pkey') THEN ALTER TABLE txs ADD CONSTRAINT txs_pkey PRIMARY KEY(id); END IF; END $$;`); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting block_txs block_id, n primary key...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec(` DO $$ BEGIN IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage WHERE table_name = 'block_txs' AND constraint_name = 'block_txs_pkey') THEN ALTER TABLE block_txs ADD CONSTRAINT block_txs_pkey PRIMARY KEY(block_id, n); END IF; END $$;`); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting block_txs tx_id index...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec("CREATE INDEX IF NOT EXISTS block_txs_tx_id_idx ON block_txs(tx_id);"); err != nil { return err } if verbose { log.Printf(" ...done in %s. Creatng hash_type function...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec(` CREATE OR REPLACE FUNCTION hash_type(_hash BYTEA) RETURNS TEXT AS $$ BEGIN IF EXISTS (SELECT 1 FROM blocks WHERE hash = _hash) THEN RETURN 'block'; ELSIF EXISTS (SELECT 1 FROM txs WHERE txid = _hash) THEN RETURN 'tx'; END IF; RETURN NULL; END; $$ LANGUAGE plpgsql; `); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting txins (prevout_tx_id, prevout_tx_n) index...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec("CREATE INDEX IF NOT EXISTS txins_prevout_tx_id_prevout_n_idx ON txins(prevout_tx_id, prevout_n);"); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting txouts primary key...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec(` DO $$ BEGIN IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage WHERE table_name = 'txouts' AND constraint_name = 'txouts_pkey') THEN ALTER TABLE txouts ADD CONSTRAINT txouts_pkey PRIMARY KEY(tx_id, n); END IF; END $$;`); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting txouts address prefix index...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec(` CREATE OR REPLACE FUNCTION extract_address(scriptPubKey BYTEA) RETURNS BYTEA AS $$ BEGIN IF SUBSTR(scriptPubKey, 1, 3) = E'\\x76a914' THEN -- P2PKH RETURN SUBSTR(scriptPubKey, 4, 20); ELSIF SUBSTR(scriptPubKey, 1, 2) = E'\\xa914' THEN -- P2SH RETURN SUBSTR(scriptPubKey, 3, 20); ELSIF SUBSTR(scriptPubKey, 1, 2) = E'\\x0014' THEN -- P2WPKH RETURN SUBSTR(scriptPubKey, 3, 20); ELSIF SUBSTR(scriptPubKey, 1, 2) = E'\\x0020' THEN -- P2WSH RETURN SUBSTR(scriptPubKey, 3, 32); END IF; RETURN NULL; END; $$ LANGUAGE plpgsql IMMUTABLE; -- Cast the first 8 bytes of a BYTEA as a BIGINT CREATE OR REPLACE FUNCTION bytes2int8(bytes BYTEA) RETURNS BIGINT AS $$ BEGIN RETURN SUBSTR(bytes::text, 2, 16)::bit(64)::bigint; END; $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; -- Address prefix (txout) CREATE OR REPLACE FUNCTION addr_prefix(scriptPubKey BYTEA) RETURNS BIGINT AS $$ BEGIN RETURN public.bytes2int8(public.extract_address(scriptPubKey)); END; $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; CREATE INDEX IF NOT EXISTS txouts_addr_prefix_tx_id_idx ON txouts(addr_prefix(scriptpubkey), tx_id); `); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting txins address prefix index...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec(` CREATE OR REPLACE FUNCTION parse_witness(witness BYTEA) RETURNS BYTEA[] AS $$ DECLARE stack BYTEA[]; len INT; pos INT = 1; slen INT; BEGIN IF witness IS NULL OR witness = '' THEN RETURN NULL; END IF; len = GET_BYTE(witness, 0); -- this is a varint, but whatever WHILE len > 0 LOOP slen = GET_BYTE(witness, pos); IF slen = 253 THEN slen = GET_BYTE(witness, pos+1) + GET_BYTE(witness, pos+2)*256; pos = pos+2; -- NB: There is a possibility of a 4-byte compact, but no transaction uses it END IF; stack = stack || SUBSTR(witness, pos+2, slen); pos = pos + slen + 1; len = len - 1; END LOOP; RETURN stack; END; $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; CREATE OR REPLACE FUNCTION extract_address(scriptsig BYTEA, witness BYTEA) RETURNS BYTEA AS $$ DECLARE pub BYTEA; sha BYTEA; wits BYTEA[]; len INT; pos INT; op INT; BEGIN IF LENGTH(scriptsig) = 0 OR scriptsig IS NULL THEN -- Native SegWit: P2WSH or P2WPKH wits = public.parse_witness(witness); pub = wits[array_length(wits, 1)]; sha = public.digest(pub, 'sha256'); IF ARRAY_LENGTH(wits, 1) = 2 AND LENGTH(pub) = 33 THEN -- Most likely a P2WPKH RETURN public.digest(sha, 'ripemd160'); ELSE -- Most likely a P2WSH RETURN sha; END IF; ELSE len = GET_BYTE(scriptsig, 0); IF len = LENGTH(scriptsig) - 1 THEN -- Most likely a P2SH (or P2SH-P2W*) RETURN public.digest(public.digest(SUBSTR(scriptsig, 2), 'sha256'), 'ripemd160'); ELSE -- P2PKH or longer P2SH, either way the last thing is what we need pos = 0; WHILE pos < LENGTH(scriptsig)-1 LOOP op = GET_BYTE(scriptsig, pos); IF op > 0 AND op < 76 THEN len = op; pos = pos + 1; ELSEIF op = 76 THEN len = GET_BYTE(scriptsig, pos+1); pos = pos + 2; ELSEIF op = 77 THEN len = GET_BYTE(scriptsig, pos+1) + GET_BYTE(scriptsig, pos+2)*256; pos = pos + 3; ELSE pos = pos + 1; CONTINUE; END IF; pub = SUBSTR(scriptsig, pos+1, len); pos = pos + len; END LOOP; RETURN public.digest(public.digest(pub, 'sha256'), 'ripemd160'); END IF; END IF; RETURN NULL; END; $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; -- Address prefix (txin) CREATE OR REPLACE FUNCTION addr_prefix(scriptsig BYTEA, witness BYTEA) RETURNS BIGINT AS $$ BEGIN RETURN public.bytes2int8(public.extract_address(scriptsig, witness)); END; $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; -- Partial/conditional index because coinbase txin scriptsigs are garbage CREATE INDEX IF NOT EXISTS txins_addr_prefix_tx_id_idx ON txins(addr_prefix(scriptsig, witness), tx_id) WHERE prevout_tx_id IS NOT NULL; `); err != nil { return err } if verbose { log.Printf(" ...done in %s.", time.Now().Sub(start).Round(time.Millisecond)) } return nil } func createConstraints(db *sql.DB, verbose bool) error { var start time.Time if verbose { log.Printf(" Starting block_txs block_id foreign key...") } start = time.Now() if _, err := db.Exec(` DO $$ BEGIN -- NB: table_name is the target/foreign table IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage WHERE table_name = 'blocks' AND constraint_name = 'block_txs_block_id_fkey') THEN ALTER TABLE block_txs ADD CONSTRAINT block_txs_block_id_fkey FOREIGN KEY (block_id) REFERENCES blocks(id) ON DELETE CASCADE NOT VALID; END IF; END $$;`); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting block_txs tx_id foreign key...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec(` DO $$ BEGIN -- NB: table_name is the target/foreign table IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage WHERE table_name = 'txs' AND constraint_name = 'block_txs_tx_id_fkey') THEN ALTER TABLE block_txs ADD CONSTRAINT block_txs_tx_id_fkey FOREIGN KEY (tx_id) REFERENCES txs(id) ON DELETE CASCADE NOT VALID DEFERRABLE; END IF; END $$;`); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting txins tx_id foreign key...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec(` DO $$ BEGIN -- NB: table_name is the target/foreign table IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage WHERE table_name = 'txs' AND constraint_name = 'txins_tx_id_fkey') THEN ALTER TABLE txins ADD CONSTRAINT txins_tx_id_fkey FOREIGN KEY (tx_id) REFERENCES txs(id) ON DELETE CASCADE NOT VALID DEFERRABLE; END IF; END $$;`); err != nil { return err } if verbose { log.Printf(" ...done in %s. Starting txouts tx_id foreign key...", time.Now().Sub(start).Round(time.Millisecond)) } start = time.Now() if _, err := db.Exec(` DO $$ BEGIN -- NB: table_name is the target/foreign table IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage WHERE table_name = 'txs' AND constraint_name = 'txouts_tx_id_fkey') THEN ALTER TABLE txouts ADD CONSTRAINT txouts_tx_id_fkey FOREIGN KEY (tx_id) REFERENCES txs(id) ON DELETE CASCADE NOT VALID DEFERRABLE; END IF; END $$;`); err != nil { return err } if verbose { log.Printf(" ...done in %s.", time.Now().Sub(start).Round(time.Millisecond)) } return nil } // Set the orphan status starting from the highest block and going // backwards, up to limit. If limit is 0, the whole table is updated. // // The WITH RECURSIVE part connects rows by joining prevhash to hash, // thereby building a list which starts at the highest hight and going // towards the beginning until no parent can be found. // // Then we LEFT JOIN the above to the blocks table, and where there is // no match (x.id IS NULL) we mark it as orphan. // // If the chain is split, i.e. there is more than one row at the // highest height, then no blocks in the split will be marked as // orphan, which is fine. func (w *PGWriter) SetOrphans(limit int) error { var limitNSql string if limit > 0 { limitNSql = fmt.Sprintf("WHERE n < %d", limit+50) } if _, err := w.db.Exec(fmt.Sprintf(` DO $$ DECLARE min_id INT; BEGIN -- select an id going back limit rows. SELECT MIN(id) INTO min_id FROM (SELECT id FROM blocks ORDER BY id DESC LIMIT %d) x; -- a limit of 0 leaves min_id as NULL, need to correct that IF (min_id IS NULL) THEN min_id = -1; END IF; -- the key idea is that the recursive part goes back further than the -- limit imposed by min_id UPDATE blocks SET orphan = a.orphan FROM ( SELECT blocks.id, x.id IS NULL AS orphan FROM blocks LEFT JOIN ( WITH RECURSIVE recur(id, prevhash, n) AS ( -- non-recursive term, executed once SELECT id, prevhash, 0 AS n FROM blocks -- this should be faster than MAX(height) WHERE height IN (SELECT height FROM blocks ORDER BY height DESC LIMIT 1) UNION ALL -- recursive term, recur refers to previous iteration result -- iteration stops when previous row prevhash finds no match OR -- if n reaches a limit (see limitSql above) SELECT blocks.id, blocks.prevhash, n+1 AS n FROM recur JOIN blocks ON blocks.hash = recur.prevhash %s ) SELECT recur.id, recur.prevhash FROM recur ) x ON blocks.id = x.id ) a WHERE blocks.id = a.id AND blocks.id >= min_id; END $$`, limit, limitNSql)); err != nil { return err } return nil } func createTxinsTriggers(db *sql.DB) error { if _, err := db.Exec(` CREATE OR REPLACE FUNCTION txins_after_trigger_func() RETURNS TRIGGER AS $$ BEGIN IF (TG_OP = 'DELETE') THEN IF OLD.prevout_tx_id IS NOT NULL THEN UPDATE txouts SET spent = FALSE WHERE tx_id = OLD.prevout_tx_id AND n = OLD.prevout_n; END IF; RETURN OLD; ELSIF (TG_OP = 'UPDATE' OR TG_OP = 'INSERT') THEN IF NEW.prevout_tx_id IS NOT NULL THEN UPDATE txouts SET spent = TRUE WHERE tx_id = NEW.prevout_tx_id AND n = NEW.prevout_n; IF NOT FOUND THEN RAISE EXCEPTION 'Unknown tx_id:prevout_n combination: %:%', NEW.prevout_tx_id, NEW.prevout_n; END IF; END IF; RETURN NEW; END IF; RETURN NULL; END; $$ LANGUAGE plpgsql PARALLEL SAFE; CREATE CONSTRAINT TRIGGER txins_after_trigger AFTER INSERT OR UPDATE OR DELETE ON txins DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE txins_after_trigger_func(); `); err != nil { return err } return nil } func warmupCache(db *sql.DB, cache *txIdCache, blocks int) error { stmt := ` SELECT t.id, t.txid, o.cnt FROM txs t JOIN LATERAL ( SELECT COUNT(1) AS cnt FROM txouts o WHERE o.tx_id = t.id ) o ON true WHERE id > ( SELECT MIN(tx_id) FROM block_txs bt JOIN ( SELECT id FROM blocks ORDER BY id DESC LIMIT $1 ) b ON b.id = bt.block_id ) ORDER BY t.id; ` rows, err := db.Query(stmt, blocks) if err != nil { return err } defer rows.Close() var ( txid int64 hash blkchain.Uint256 cnt int ) for rows.Next() { if err := rows.Scan(&txid, &hash, &cnt); err != nil { return err } cache.add(hash, txid, cnt) } return nil } func takeSnapshot(db *sql.DB, dataset string, height int, tag string) error { log.Printf("Taking a ZFS snapshot: %s@%d%s", dataset, height, tag) log.Printf(" calling pg_backup_start('blkchain', true)") // pg < 15 use pg_start_backup if _, err := db.Exec(`SELECT pg_backupstart('blkchain', true)`); err != nil { return err } log.Printf(" executing COPY (SELECT) TO PROGRAM 'zfs snapshot %s@%d%s'", dataset, height, tag) if _, err := db.Exec(fmt.Sprintf(`COPY (SELECT) TO PROGRAM 'zfs snapshot %s@%d%s'`, dataset, height, tag)); err != nil { return err } log.Printf(" calling pg_backup_stop()") // pg < 15 use pg_stop_backup if _, err := db.Exec(`SELECT pg_backup_stop()`); err != nil { return err } return nil } // TODO: Presently unused func waitForAutovacuum(db *sql.DB) error { avCount := func() (int, error) { stmt := `SELECT COUNT(1) FROM pg_stat_activity WHERE query LIKE 'autovacuum:%'` rows, err := db.Query(stmt) if err != nil { return 0, err } defer rows.Close() var cnt int for rows.Next() { if err := rows.Scan(&cnt); err != nil { return 0, err } } return cnt, nil } for { if cnt, err := avCount(); err != nil { log.Printf("waitForAutovacuum: %v", err) return err } else if cnt == 0 { return nil } time.Sleep(3 * time.Second) } }