1779 lines
48 KiB
Go
1779 lines
48 KiB
Go
package db
|
|
|
|
import (
|
|
"bytes"
|
|
"database/sql"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/blkchain/blkchain"
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
// Explanation of how we handle integers. In Bitcoin structures most
|
|
// integers are uint32. Postgres does not have an unsigned int type,
|
|
// but using a bigint to store integers seems like a waste of
|
|
// space. So we cast all uints to int32, and thus 0xFFFFFFFF would
|
|
// become -1 in Postgres, which is fine as long as we know all the
|
|
// bits are correct.
|
|
|
|
var (
|
|
writerWg sync.WaitGroup
|
|
errors bool
|
|
)
|
|
|
|
type PGWriter struct {
|
|
blockCh chan *BlockRec
|
|
wg *sync.WaitGroup
|
|
db *sql.DB
|
|
start time.Time
|
|
zfsDataset string
|
|
parallel int
|
|
}
|
|
|
|
type isUTXOer interface {
|
|
IsUTXO(blkchain.Uint256, uint32) (bool, error)
|
|
}
|
|
|
|
func NewPGWriter(connstr string, cacheSize int, utxo isUTXOer, zfsDataset string, parallel int) (*PGWriter, error) {
|
|
|
|
start := time.Now()
|
|
|
|
var (
|
|
wg sync.WaitGroup
|
|
|
|
firstImport = true
|
|
|
|
db *sql.DB
|
|
err error
|
|
)
|
|
|
|
if connstr != "nulldb" {
|
|
db, err = sql.Open("postgres", connstr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := createPgcrypto(db); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := createTables(db); err != nil {
|
|
if strings.Contains(err.Error(), "already exists") {
|
|
// this is fine, cancel deferred index/constraint creation
|
|
firstImport = false
|
|
log.Printf("Tables already exist, catching up.")
|
|
} else {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if firstImport {
|
|
if utxo == nil {
|
|
return nil, fmt.Errorf("First import must be done with UTXO checker, i.e. from LevelDb directly. (utxo == nil)")
|
|
}
|
|
log.Printf("Tables created without indexes, which are created at the very end.")
|
|
log.Printf("Setting table parameters: autovacuum_enabled=false")
|
|
if err := setTableStorageParams(db); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if err := createPrevoutMissTable(db); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
bch := make(chan *BlockRec, 2)
|
|
wg.Add(1)
|
|
|
|
w := &PGWriter{
|
|
blockCh: bch,
|
|
wg: &wg,
|
|
db: db,
|
|
start: start,
|
|
zfsDataset: zfsDataset,
|
|
parallel: parallel,
|
|
}
|
|
|
|
go w.pgBlockWorker(bch, &wg, firstImport, cacheSize, utxo)
|
|
|
|
return w, nil
|
|
}
|
|
|
|
func (p *PGWriter) Close() {
|
|
close(p.blockCh)
|
|
p.wg.Wait()
|
|
}
|
|
|
|
func (p *PGWriter) Uptime() time.Duration {
|
|
return time.Now().Sub(p.start)
|
|
}
|
|
|
|
func (p *PGWriter) WriteBlock(b *BlockRec, sync bool) error {
|
|
if sync {
|
|
b.sync = make(chan bool)
|
|
}
|
|
p.blockCh <- b
|
|
if sync {
|
|
if ok := <-b.sync; !ok {
|
|
log.Printf("Error writing block: %v", b.Block.Hash())
|
|
return fmt.Errorf("Error writing block: %v", b.Block.Hash())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *PGWriter) HeightAndHashes(back int) (map[int][]blkchain.Uint256, error) {
|
|
return getHeightAndHashes(w.db, back)
|
|
}
|
|
|
|
func (w *PGWriter) pgBlockWorker(ch <-chan *BlockRec, wg *sync.WaitGroup, firstImport bool, cacheSize int, utxo isUTXOer) {
|
|
defer wg.Done()
|
|
|
|
bid, err := getLastBlockId(w.db)
|
|
if err != nil {
|
|
log.Printf("Error getting last block id, exiting: %v", err)
|
|
return
|
|
}
|
|
txid, err := getLastTxId(w.db)
|
|
if err != nil {
|
|
log.Printf("Error getting last tx id, exiting: %v", err)
|
|
return
|
|
}
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
err := <-errCh
|
|
log.Printf("ERROR (all commits disabled now): %v", err)
|
|
errors = true
|
|
}()
|
|
|
|
blockCh := make(chan *BlockRec, 2)
|
|
go pgBlockWriter(blockCh, errCh, w.db)
|
|
|
|
txCh := make(chan *txRec, 64)
|
|
go pgTxWriter(txCh, errCh, w.db)
|
|
|
|
txInCh := make(chan *txInRec, 64)
|
|
go pgTxInWriter(txInCh, errCh, w.db, firstImport)
|
|
|
|
txOutCh := make(chan *txOutRec, 64)
|
|
go pgTxOutWriter(txOutCh, errCh, w.db, utxo)
|
|
|
|
writerWg.Add(4)
|
|
|
|
hashes, err := getHeightAndHashes(w.db, 1)
|
|
if err != nil {
|
|
log.Printf("Error getting last hash and height, exiting: %v", err)
|
|
return
|
|
}
|
|
|
|
// nil utxo means this is coming from a btcnode, we do not need to skip blocks
|
|
if utxo != nil && len(hashes) > 0 {
|
|
var bhash blkchain.Uint256
|
|
for _, hh := range hashes {
|
|
bhash = hh[len(hh)-1] // last hash in the list is the last hash
|
|
}
|
|
log.Printf("PGWriter ignoring blocks up to hash %v", bhash)
|
|
skip, last := 0, time.Now()
|
|
for b := range ch {
|
|
hash := b.Block.Hash()
|
|
if bhash == hash {
|
|
break
|
|
} else {
|
|
skip++
|
|
if skip%10 == 0 && time.Now().Sub(last) > 5*time.Second {
|
|
log.Printf(" - ignored %d blocks...", skip)
|
|
last = time.Now()
|
|
}
|
|
}
|
|
}
|
|
if skip > 0 {
|
|
log.Printf("Ignored %d total blocks.", skip)
|
|
}
|
|
}
|
|
|
|
idCache := newTxIdCache(cacheSize)
|
|
|
|
var syncCh chan bool
|
|
if !firstImport {
|
|
// no firstImport means that the constraints already
|
|
// exist, and we need to wait for a tx to be commited before
|
|
// ins/outs can be inserted. Same with block/tx.
|
|
syncCh = make(chan bool)
|
|
|
|
// The cache must be warmed up in (a rare) case when we encounter a chain split
|
|
// soon after we start to avoid duplicate txid key errors
|
|
nBlocks := 6
|
|
log.Printf("Warming up the txid cache going back %d blocks...", nBlocks)
|
|
if err := warmupCache(w.db, idCache, nBlocks); err != nil {
|
|
log.Printf("Error warming up the idCache: %v", err)
|
|
}
|
|
log.Printf("Warming up the cache done.")
|
|
}
|
|
|
|
txcnt, start, lastStatus, lastCacheStatus, lastHeight := 0, time.Now(), time.Now(), 0, -1
|
|
blkCnt, blkSz, batchSz := 0, 0, 0
|
|
for br := range ch {
|
|
bid++
|
|
blkCnt++
|
|
|
|
br.Id = bid
|
|
br.Hash = br.Block.Hash()
|
|
|
|
if br.Height < 0 { // We have to look it up
|
|
|
|
hashes, err := getHeightAndHashes(w.db, 5)
|
|
if err != nil {
|
|
log.Printf("pgBlockWorker() error: %v", err)
|
|
}
|
|
|
|
hloop:
|
|
for height, hh := range hashes {
|
|
for _, h := range hh {
|
|
if h == br.PrevHash {
|
|
br.Height = height + 1
|
|
break hloop
|
|
}
|
|
}
|
|
}
|
|
|
|
if br.Height < 0 {
|
|
log.Printf("pgBlockWorker: Could not connect block to a previous block on our chain, ignoring it.")
|
|
if br.sync != nil {
|
|
br.sync <- false
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
lastHeight = br.Height
|
|
|
|
blkSz += br.Size()
|
|
batchSz += br.Size()
|
|
blockCh <- br
|
|
|
|
for n, tx := range br.Txs {
|
|
txid++
|
|
txcnt++
|
|
|
|
hash := tx.Hash()
|
|
|
|
// Check if recently seen and add to cache.
|
|
recentId := idCache.add(hash, txid, len(tx.TxOuts))
|
|
txCh <- &txRec{
|
|
id: recentId,
|
|
n: n,
|
|
blockId: bid,
|
|
tx: tx,
|
|
hash: hash,
|
|
dupe: recentId != txid, // dupes are not written but referred to in block_txs
|
|
}
|
|
|
|
if recentId != txid {
|
|
// This is a recent transaction, nothing to do
|
|
continue
|
|
}
|
|
|
|
for n, txin := range tx.TxIns {
|
|
txInCh <- &txInRec{
|
|
txId: txid,
|
|
n: n,
|
|
txIn: txin,
|
|
idCache: idCache,
|
|
}
|
|
}
|
|
|
|
for n, txout := range tx.TxOuts {
|
|
txOutCh <- &txOutRec{
|
|
txId: txid,
|
|
n: n,
|
|
txOut: txout,
|
|
hash: hash,
|
|
}
|
|
}
|
|
if errors {
|
|
log.Printf("Errors detected, aborting.")
|
|
break
|
|
}
|
|
}
|
|
|
|
if errors {
|
|
if br.sync != nil {
|
|
br.sync <- false
|
|
}
|
|
break
|
|
}
|
|
|
|
if !firstImport {
|
|
// commit after every block
|
|
blockCh <- &BlockRec{
|
|
sync: syncCh,
|
|
}
|
|
if syncCh != nil {
|
|
<-syncCh
|
|
}
|
|
txCh <- &txRec{
|
|
sync: syncCh,
|
|
}
|
|
if syncCh != nil {
|
|
<-syncCh
|
|
}
|
|
// NB: Outputs must be commited before inputs!
|
|
txOutCh <- &txOutRec{
|
|
sync: syncCh,
|
|
}
|
|
if syncCh != nil {
|
|
<-syncCh
|
|
}
|
|
if br.sync != nil {
|
|
// wait for it to finish
|
|
txInCh <- &txInRec{
|
|
sync: syncCh,
|
|
}
|
|
if syncCh != nil {
|
|
<-syncCh
|
|
br.sync <- true
|
|
}
|
|
} else {
|
|
// we don't care when it finishes
|
|
txInCh <- nil
|
|
}
|
|
} else if bid%1024 == 0 {
|
|
// commit every N blocks
|
|
if batchSz > 100_000_000 { // but not less than a specific batch size
|
|
batchSz = 0
|
|
blockCh <- nil
|
|
txCh <- nil
|
|
txInCh <- nil
|
|
txOutCh <- nil
|
|
}
|
|
}
|
|
|
|
// report progress
|
|
if time.Now().Sub(lastStatus) > 5*time.Second {
|
|
uptime := w.Uptime().Round(time.Second)
|
|
log.Printf("Height: %d Txs: %d Time: %v Tx/s: %02f KB/s: %02f Runtime: %s",
|
|
br.Height, txcnt,
|
|
time.Unix(int64(br.Time), 0),
|
|
float64(txcnt)/time.Now().Sub(start).Seconds(),
|
|
float64(blkSz/1024)/time.Now().Sub(start).Seconds(),
|
|
uptime)
|
|
lastStatus = time.Now()
|
|
lastCacheStatus++
|
|
if lastCacheStatus == 5 {
|
|
idCache.reportStats(false)
|
|
lastCacheStatus = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
close(blockCh)
|
|
close(txInCh)
|
|
close(txOutCh)
|
|
close(txCh)
|
|
|
|
log.Printf("Closed db channels, waiting for workers to finish...")
|
|
writerWg.Wait()
|
|
log.Printf("Workers finished.")
|
|
|
|
if blkCnt == 0 {
|
|
return
|
|
}
|
|
|
|
idCache.reportStats(true) // Final stats ane last time
|
|
|
|
if errors || w.db == nil {
|
|
return
|
|
}
|
|
|
|
verbose := firstImport
|
|
|
|
if firstImport {
|
|
idCache.clear()
|
|
log.Printf("Cleared the cache.")
|
|
|
|
// First only indexes necessary for fixPrevoutTxId
|
|
log.Printf("Creating indexes part 1, please be patient, this may take a long time...")
|
|
if err := createIndexes1(w.db, verbose); err != nil {
|
|
log.Printf("Error creating indexes: %v", err)
|
|
}
|
|
|
|
if len(w.zfsDataset) > 0 {
|
|
takeSnapshot(w.db, w.zfsDataset, lastHeight, "-preindex")
|
|
}
|
|
|
|
if idCache.miss > 0 {
|
|
log.Printf("Running ANALYZE txins, _prevout_miss, txs to ensure the next step selects the optimal plan...")
|
|
start := time.Now()
|
|
if err := fixPrevoutTxIdAnalyze(w.db); err != nil {
|
|
log.Printf("Error running ANALYZE: %v", err)
|
|
}
|
|
log.Printf("...done in %s. Fixing missing prevout_tx_id entries (if needed), this may take a long time..",
|
|
time.Now().Sub(start).Round(time.Millisecond))
|
|
start = time.Now()
|
|
if err := fixPrevoutTxId(w.db, w.parallel, true); err != nil {
|
|
log.Printf("Error fixing prevout_tx_id: %v", err)
|
|
}
|
|
log.Printf("...done in %s.", time.Now().Sub(start).Round(time.Millisecond))
|
|
} else {
|
|
log.Printf("NOT fixing missing prevout_tx_id entries because there were 0 cache misses.")
|
|
}
|
|
|
|
// Now the rest of the indexes
|
|
log.Printf("Creating indexes part 2, please be patient, this may take a long time...")
|
|
if err := createIndexes2(w.db, verbose); err != nil {
|
|
log.Printf("Error creating indexes: %v", err)
|
|
}
|
|
|
|
// Finally constraints
|
|
log.Printf("Creating constraints (if needed), please be patient, this may take a long time...")
|
|
if err := createConstraints(w.db, verbose); err != nil {
|
|
log.Printf("Error creating constraints: %v", err)
|
|
}
|
|
|
|
// NOTE: It is imperative that this trigger is created *after* the fixPrevoutTxId() call, or else these
|
|
// triggers will be needlessly triggered slowing fixPrevoutTxId() tremendously. The trigger sets the spent
|
|
// column, which should anyway be correctly set during the initial import based on the LevelDb UTXO set.
|
|
log.Printf("Creating txins triggers.")
|
|
if err := createTxinsTriggers(w.db); err != nil {
|
|
log.Printf("Error creating txins triggers: %v", err)
|
|
}
|
|
|
|
}
|
|
|
|
log.Printf("Dropping _prevout_miss table.")
|
|
if err := dropPrevoutMissTable(w.db); err != nil {
|
|
log.Printf("Error dropping _prevout_miss table: %v", err)
|
|
}
|
|
|
|
orphanLimit, start := 0, time.Now()
|
|
if !firstImport {
|
|
// No need to walk back the entire chain
|
|
orphanLimit = blkCnt + 50
|
|
log.Printf("Marking orphan blocks (going back %d blocks)...", orphanLimit)
|
|
} else {
|
|
log.Printf("Marking orphan blocks (whole chain)...")
|
|
}
|
|
if err := w.SetOrphans(orphanLimit); err != nil {
|
|
log.Printf("Error marking orphans: %v", err)
|
|
}
|
|
log.Printf("Done marking orphan blocks in %s.", time.Now().Sub(start).Round(time.Millisecond))
|
|
|
|
if firstImport {
|
|
if err := resetTableStorageParams(w.db); err != nil {
|
|
log.Printf("Error resetting storage parameters: %v", err)
|
|
}
|
|
log.Printf("Reset table storage parameters: autovacuum_enabled.")
|
|
log.Printf("Indexes and constraints created.")
|
|
if len(w.zfsDataset) > 0 {
|
|
takeSnapshot(w.db, w.zfsDataset, lastHeight, "-postindex")
|
|
}
|
|
}
|
|
}
|
|
|
|
func pgBlockWriter(c chan *BlockRec, errCh chan<- error, db *sql.DB) {
|
|
defer writerWg.Done()
|
|
|
|
cols := []string{"id", "height", "hash", "version", "prevhash", "merkleroot", "time", "bits", "nonce", "orphan", "size", "base_size", "weight", "virt_size"}
|
|
|
|
txn, stmt, err := begin(db, "blocks", cols)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgBlockWriter1: %v", err)
|
|
return
|
|
}
|
|
|
|
for br := range c {
|
|
if br == nil || br.Block == nil { // commit signal
|
|
if err := commit(stmt, txn, nil); err != nil {
|
|
errCh <- fmt.Errorf("BLock commit error: %v", err)
|
|
}
|
|
txn, stmt, err = begin(db, "blocks", cols)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgBlockWriter2: %v", err)
|
|
}
|
|
if br != nil && br.sync != nil {
|
|
br.sync <- true
|
|
}
|
|
continue
|
|
}
|
|
|
|
if stmt != nil {
|
|
b := br.Block
|
|
if _, err := stmt.Exec(
|
|
br.Id,
|
|
br.Height,
|
|
br.Hash[:],
|
|
int32(b.Version),
|
|
b.PrevHash[:],
|
|
b.HashMerkleRoot[:],
|
|
int32(b.Time),
|
|
int32(b.Bits),
|
|
int32(b.Nonce),
|
|
br.Orphan,
|
|
br.Size(),
|
|
br.BaseSize(),
|
|
br.Weight(),
|
|
br.VirtualSize(),
|
|
); err != nil {
|
|
errCh <- fmt.Errorf("pgBlockWriter3: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Printf("Block writer channel closed, commiting transaction.")
|
|
if err = commit(stmt, txn, nil); err != nil {
|
|
errCh <- fmt.Errorf("pgBlockWriter4: %v", err)
|
|
}
|
|
log.Printf("Block writer done.")
|
|
}
|
|
|
|
func pgTxWriter(c chan *txRec, errCh chan<- error, db *sql.DB) {
|
|
defer writerWg.Done()
|
|
|
|
cols := []string{"id", "txid", "version", "locktime", "size", "base_size", "weight", "virt_size"}
|
|
bcols := []string{"block_id", "n", "tx_id"}
|
|
|
|
txn, stmt, err := begin(db, "txs", cols)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgTxWriter1: %v", err)
|
|
return
|
|
}
|
|
|
|
btxn, bstmt, err := begin(db, "block_txs", bcols)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgTxWriter2: %v", err)
|
|
return
|
|
}
|
|
|
|
// It's not clear that parallelizing Txs provides any speed
|
|
// advntage, but whatever.
|
|
|
|
nWorkers := 3
|
|
wch := make(chan *txRec, 4)
|
|
var wg sync.WaitGroup
|
|
worker := func() {
|
|
wg.Add(1)
|
|
defer wg.Done()
|
|
for tr := range wch {
|
|
|
|
if !tr.dupe {
|
|
|
|
if stmt != nil {
|
|
t := tr.tx
|
|
if _, err := stmt.Exec(
|
|
tr.id,
|
|
tr.hash[:],
|
|
int32(t.Version),
|
|
int32(t.LockTime),
|
|
t.Size(),
|
|
t.BaseSize(),
|
|
t.Weight(),
|
|
t.VirtualSize(),
|
|
); err != nil {
|
|
errCh <- fmt.Errorf("pgTxWriter3: %v", err)
|
|
}
|
|
}
|
|
// It can still be a dupe if we are catching up and the
|
|
// cache is empty, which is why we warmupCache.
|
|
}
|
|
|
|
if bstmt != nil {
|
|
if _, err := bstmt.Exec(
|
|
tr.blockId,
|
|
tr.n,
|
|
tr.id,
|
|
); err != nil {
|
|
errCh <- fmt.Errorf("pgTxWriter4: %v", err)
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// initital start workers
|
|
for i := 0; i < nWorkers; i++ {
|
|
go worker()
|
|
}
|
|
|
|
for tr := range c {
|
|
if tr == nil || tr.tx == nil { // commit signal
|
|
// close channel, wait for workers to finish
|
|
close(wch)
|
|
wg.Wait()
|
|
// commit
|
|
if err := commit(stmt, txn, nil); err != nil {
|
|
errCh <- fmt.Errorf("pgTxWriter5: %v", err)
|
|
}
|
|
if err = commit(bstmt, btxn, nil); err != nil {
|
|
errCh <- fmt.Errorf("pgTxWriter6: %v", err)
|
|
}
|
|
var err error
|
|
txn, stmt, err = begin(db, "txs", cols)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgTxWriter7: %v", err)
|
|
}
|
|
btxn, bstmt, err = begin(db, "block_txs", bcols)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgTxWriter8: %v", err)
|
|
}
|
|
if tr != nil && tr.sync != nil {
|
|
tr.sync <- true
|
|
}
|
|
// make a new channel, restart workers
|
|
wch = make(chan *txRec, 4)
|
|
for i := 0; i < nWorkers; i++ {
|
|
go worker()
|
|
}
|
|
continue
|
|
}
|
|
wch <- tr
|
|
}
|
|
close(wch)
|
|
wg.Wait()
|
|
|
|
log.Printf("Tx writer channel closed, committing transaction.")
|
|
|
|
if err := commit(stmt, txn, nil); err != nil {
|
|
errCh <- fmt.Errorf("pgTxWriter9: %v", err)
|
|
}
|
|
if err := commit(bstmt, btxn, nil); err != nil {
|
|
errCh <- fmt.Errorf("pgTxWriter10: %v", err)
|
|
}
|
|
|
|
log.Printf("Tx writer done.")
|
|
}
|
|
|
|
func pgTxInWriter(c chan *txInRec, errCh chan<- error, db *sql.DB, firstImport bool) {
|
|
defer writerWg.Done()
|
|
|
|
cols := []string{"tx_id", "n", "prevout_tx_id", "prevout_n", "scriptsig", "sequence", "witness"}
|
|
|
|
txn, stmt, err := begin(db, "txins", cols)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgTxInWriter1: %v", err)
|
|
return
|
|
}
|
|
|
|
misses := make([]*prevoutMiss, 0, 2000)
|
|
|
|
// Parallelizing txins provides a small speed up, especially
|
|
// during first import when we write misses to the db directly. We
|
|
// also have a dedicated worker for prevout misses during
|
|
// firstImport.
|
|
|
|
nWorkers := 8
|
|
wch := make(chan *txInRec, 4)
|
|
mch := make(chan *prevoutMiss, 64)
|
|
var (
|
|
wg sync.WaitGroup
|
|
mg sync.WaitGroup
|
|
)
|
|
|
|
missWriter := func() { // prevoutMiss recorder
|
|
mg.Add(1)
|
|
defer mg.Done()
|
|
for miss := range mch {
|
|
// write it to the DB directly
|
|
if stmt != nil {
|
|
if err := recordPrevoutMiss(db, miss.txId, miss.n, miss.prevOutHash); err != nil {
|
|
errCh <- fmt.Errorf("pgTxInWriter2: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
worker := func() {
|
|
wg.Add(1)
|
|
defer wg.Done()
|
|
for tr := range wch {
|
|
|
|
t := tr.txIn
|
|
var wb interface{}
|
|
if t.Witness != nil {
|
|
var b bytes.Buffer
|
|
blkchain.BinWrite(&t.Witness, &b)
|
|
wb = b.Bytes()
|
|
}
|
|
|
|
var prevOutTxId *int64 = nil
|
|
if t.PrevOut.N != 0xffffffff { // coinbase
|
|
prevOutTxId = tr.idCache.check(t.PrevOut.Hash)
|
|
|
|
if prevOutTxId == nil { // cache miss
|
|
if firstImport {
|
|
mch <- &prevoutMiss{tr.txId, tr.n, t.PrevOut.Hash}
|
|
} else {
|
|
// remember it, it will be written just when needed
|
|
misses = append(misses, &prevoutMiss{tr.txId, tr.n, t.PrevOut.Hash})
|
|
}
|
|
}
|
|
}
|
|
|
|
if stmt != nil {
|
|
if _, err := stmt.Exec(
|
|
tr.txId,
|
|
tr.n,
|
|
prevOutTxId,
|
|
int32(t.PrevOut.N),
|
|
t.ScriptSig,
|
|
int32(t.Sequence),
|
|
wb,
|
|
); err != nil {
|
|
errCh <- fmt.Errorf("pgTxInWriter3: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// initital start workers
|
|
go missWriter()
|
|
for i := 0; i < nWorkers; i++ {
|
|
go worker()
|
|
}
|
|
|
|
for tr := range c {
|
|
if tr == nil || tr.txIn == nil { // commit signal
|
|
// close channel, wait for workers to finish
|
|
close(wch)
|
|
wg.Wait()
|
|
close(mch)
|
|
mg.Wait()
|
|
// commit
|
|
if err := commit(stmt, txn, misses); err != nil {
|
|
errCh <- fmt.Errorf("pgTxInWriter4: %v", err)
|
|
}
|
|
misses = misses[:0]
|
|
var err error
|
|
txn, stmt, err = begin(db, "txins", cols)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgTxInWriter5: %v", err)
|
|
}
|
|
if tr != nil && tr.sync != nil {
|
|
tr.sync <- true
|
|
}
|
|
// make a new channel, restart workers
|
|
mch = make(chan *prevoutMiss, 64)
|
|
go missWriter()
|
|
wch = make(chan *txInRec, 4)
|
|
for i := 0; i < nWorkers; i++ {
|
|
go worker()
|
|
}
|
|
continue
|
|
}
|
|
wch <- tr
|
|
}
|
|
close(wch)
|
|
wg.Wait()
|
|
close(mch)
|
|
mg.Wait()
|
|
|
|
log.Printf("TxIn writer channel closed, committing transaction.")
|
|
if err := commit(stmt, txn, misses); err != nil {
|
|
errCh <- fmt.Errorf("pgTxInWriter6: %v", err)
|
|
}
|
|
log.Printf("TxIn writer done.")
|
|
}
|
|
|
|
func pgTxOutWriter(c chan *txOutRec, errCh chan<- error, db *sql.DB, utxo isUTXOer) {
|
|
defer writerWg.Done()
|
|
|
|
cols := []string{"tx_id", "n", "value", "scriptpubkey", "spent"}
|
|
|
|
txn, stmt, err := begin(db, "txouts", cols)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgTxOutWriter1: %v", err)
|
|
return
|
|
}
|
|
|
|
// The call to IsUTXO() is very time consuming because it involves
|
|
// LevelDb, parallelizing here provides a huge speed up.
|
|
nWorkers := 8
|
|
wch := make(chan *txOutRec, 4)
|
|
var wg sync.WaitGroup
|
|
worker := func() {
|
|
wg.Add(1)
|
|
defer wg.Done()
|
|
for tr := range wch {
|
|
var spent bool
|
|
if utxo != nil {
|
|
// NB: Some early unspent coins are not in the UTXO set,
|
|
// probably because they are not spendable? So the spent
|
|
// flag could also be interpeted as unspendable.
|
|
isUTXO, err := utxo.IsUTXO(tr.hash, uint32(tr.n))
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgTxOutWriter2: %v", err)
|
|
}
|
|
spent = !isUTXO
|
|
}
|
|
if stmt != nil {
|
|
t := tr.txOut
|
|
if _, err := stmt.Exec(
|
|
tr.txId,
|
|
tr.n,
|
|
t.Value,
|
|
t.ScriptPubKey,
|
|
spent,
|
|
); err != nil {
|
|
errCh <- fmt.Errorf("pgTxOutWriter3: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// initital start workers
|
|
for i := 0; i < nWorkers; i++ {
|
|
go worker()
|
|
}
|
|
|
|
for tr := range c {
|
|
if tr == nil || tr.txOut == nil { // commit signal
|
|
// close channel, wait for workers to finish
|
|
close(wch)
|
|
wg.Wait()
|
|
// commit
|
|
if err := commit(stmt, txn, nil); err != nil {
|
|
errCh <- fmt.Errorf("pgTxOutWriter4: %v", err)
|
|
}
|
|
var err error
|
|
txn, stmt, err = begin(db, "txouts", cols)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("pgTxOutWriter5: %v", err)
|
|
}
|
|
if tr != nil && tr.sync != nil {
|
|
tr.sync <- true
|
|
}
|
|
// make a new channel, restart workers
|
|
wch = make(chan *txOutRec, 4)
|
|
for i := 0; i < nWorkers; i++ {
|
|
go worker()
|
|
}
|
|
continue
|
|
}
|
|
wch <- tr
|
|
}
|
|
close(wch)
|
|
wg.Wait()
|
|
|
|
log.Printf("TxOut writer channel closed, committing transaction.")
|
|
if err := commit(stmt, txn, nil); err != nil {
|
|
errCh <- fmt.Errorf("pgTxOutWriter6: %v", err)
|
|
}
|
|
log.Printf("TxOut writer done.")
|
|
}
|
|
|
|
func begin(db *sql.DB, table string, cols []string) (*sql.Tx, *sql.Stmt, error) {
|
|
if db == nil {
|
|
return nil, nil, nil
|
|
}
|
|
|
|
txn, err := db.Begin()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if _, err := txn.Exec("SET CONSTRAINTS ALL DEFERRED"); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
stmt, err := txn.Prepare(pq.CopyIn(table, cols...))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return txn, stmt, nil
|
|
}
|
|
|
|
func commit(stmt *sql.Stmt, txn *sql.Tx, misses []*prevoutMiss) (err error) {
|
|
if stmt == nil {
|
|
return nil
|
|
}
|
|
if _, err = stmt.Exec(); err != nil {
|
|
return err
|
|
}
|
|
if err = stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
// We do this here because it is not possible to use Exec() during
|
|
// a pq.CopyIn operation, but we do want this done prior to the
|
|
// Commit. Note that misses is always empty during first import,
|
|
// which writes misses to the db directly, bypassing misses slice.
|
|
if len(misses) > 0 {
|
|
if err = recordPrevoutMisses(txn, misses); err != nil {
|
|
return err
|
|
}
|
|
// TODO: Make the parallel argument below configurable?
|
|
if err = fixPrevoutTxId(txn, 4, false); err != nil { // also truncates _prevout_miss
|
|
return err
|
|
}
|
|
}
|
|
if errors {
|
|
log.Printf("Not committing transaction because of prior errors.")
|
|
return fmt.Errorf("Not committing transaction because of prior errors.")
|
|
}
|
|
err = txn.Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getHeightAndHashes(db *sql.DB, back int) (map[int][]blkchain.Uint256, error) {
|
|
if db == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
stmt := `SELECT height, hash FROM blocks
|
|
WHERE height > (SELECT MAX(height) FROM blocks) - $1`
|
|
|
|
rows, err := db.Query(stmt, back)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getHeightAndHashes: %v", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
hashes := make(map[int][]blkchain.Uint256)
|
|
|
|
for rows.Next() {
|
|
var (
|
|
height int
|
|
hash blkchain.Uint256
|
|
)
|
|
if err := rows.Scan(&height, &hash); err != nil {
|
|
return nil, err
|
|
}
|
|
if list, ok := hashes[height]; ok {
|
|
hashes[height] = append(list, hash)
|
|
} else {
|
|
hashes[height] = []blkchain.Uint256{hash}
|
|
}
|
|
}
|
|
if len(hashes) > 0 {
|
|
return hashes, nil
|
|
}
|
|
return nil, rows.Err()
|
|
}
|
|
|
|
func getLastBlockId(db *sql.DB) (int, error) {
|
|
if db == nil {
|
|
return 0, nil
|
|
}
|
|
|
|
rows, err := db.Query("SELECT id FROM blocks ORDER BY id DESC LIMIT 1")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
if rows.Next() {
|
|
var id int
|
|
if err := rows.Scan(&id); err != nil {
|
|
return 0, err
|
|
}
|
|
return id, nil
|
|
}
|
|
return 0, rows.Err()
|
|
}
|
|
|
|
func getLastTxId(db *sql.DB) (int64, error) {
|
|
if db == nil {
|
|
return 0, nil
|
|
}
|
|
|
|
rows, err := db.Query("SELECT id FROM txs ORDER BY id DESC LIMIT 1")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
if rows.Next() {
|
|
var id int64
|
|
if err := rows.Scan(&id); err != nil {
|
|
return 0, err
|
|
}
|
|
return id, nil
|
|
}
|
|
return 0, rows.Err()
|
|
}
|
|
|
|
// This could be a db connection or a transaction
|
|
type execer interface {
|
|
Exec(query string, args ...interface{}) (sql.Result, error)
|
|
Query(query string, args ...any) (*sql.Rows, error)
|
|
}
|
|
|
|
func createPrevoutMissTable(db execer) error {
|
|
sql := `
|
|
CREATE UNLOGGED TABLE IF NOT EXISTS _prevout_miss (
|
|
id SERIAL NOT NULL PRIMARY KEY
|
|
,tx_id BIGINT NOT NULL
|
|
,n INT NOT NULL
|
|
,prevout_hash BYTEA NOT NULL
|
|
);
|
|
TRUNCATE _prevout_miss RESTART IDENTITY;
|
|
`
|
|
_, err := db.Exec(sql)
|
|
return err
|
|
}
|
|
|
|
func clearPrevoutMissTable(db execer) error {
|
|
_, err := db.Exec("TRUNCATE _prevout_miss RESTART IDENTITY")
|
|
return err
|
|
}
|
|
|
|
func dropPrevoutMissTable(db *sql.DB) error {
|
|
_, err := db.Exec("DROP TABLE IF EXISTS _prevout_miss")
|
|
return err
|
|
}
|
|
|
|
func prevoutMissMaxId(db execer) (int, error) {
|
|
stmt := `SELECT MAX(id) FROM _prevout_miss`
|
|
rows, err := db.Query(stmt)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer rows.Close()
|
|
var max int
|
|
for rows.Next() {
|
|
if err := rows.Scan(&max); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
return max, nil
|
|
}
|
|
|
|
func recordPrevoutMiss(db *sql.DB, txId int64, n int, prevoutHash blkchain.Uint256) error {
|
|
stmt := "INSERT INTO _prevout_miss (tx_id, n, prevout_hash) VALUES($1, $2, $3)"
|
|
_, err := db.Exec(stmt, txId, n, prevoutHash[:])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type prevoutMiss struct {
|
|
txId int64
|
|
n int
|
|
prevOutHash blkchain.Uint256
|
|
}
|
|
|
|
func recordPrevoutMisses(txn *sql.Tx, misses []*prevoutMiss) error {
|
|
stmt, err := txn.Prepare(pq.CopyIn("_prevout_miss", "tx_id", "n", "prevout_hash"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i, m := range misses {
|
|
if _, err = stmt.Exec(m.txId, m.n, m.prevOutHash[:]); err != nil {
|
|
return err
|
|
}
|
|
misses[i] = nil // so that they can be freed
|
|
}
|
|
if _, err = stmt.Exec(); err != nil {
|
|
return err
|
|
}
|
|
if err = stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Most of the prevout_tx_id's should be already set during the
|
|
// import, but we need to correct the remaining ones. You can likely
|
|
// avoid it by increasing the txIdCache size. After the first import
|
|
// it will be looked up on the fly as needed, but a larger cache is
|
|
// still important. We run it in parallel to speed things up.
|
|
func fixPrevoutTxId(db execer, parallel int, verbose bool) error {
|
|
if parallel == 0 {
|
|
parallel = 1
|
|
}
|
|
stmt := `
|
|
UPDATE txins i
|
|
SET prevout_tx_id = t.id
|
|
FROM _prevout_miss m
|
|
JOIN txs t ON m.prevout_hash = t.txid
|
|
WHERE m.id >= $1 AND m.id < $2
|
|
AND i.tx_id = m.tx_id
|
|
AND i.n = m.n`
|
|
|
|
max, err := prevoutMissMaxId(db)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" max prevoutMiss id: %d parallel: %d", max, parallel)
|
|
}
|
|
|
|
if max == 0 {
|
|
return nil // nothing to do
|
|
}
|
|
|
|
step := max / (parallel - 1) // -1 because we do not one left over
|
|
if step > 10000 {
|
|
step = 10000 // kinda arbitrary - the idea is to show progress
|
|
}
|
|
if step < 10 {
|
|
step = 10
|
|
}
|
|
|
|
// Start workers
|
|
type startEnd struct{ start, end int }
|
|
queue := make(chan startEnd)
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < parallel; i++ {
|
|
go func(queue chan startEnd) {
|
|
wg.Add(1)
|
|
defer wg.Done()
|
|
|
|
for se := range queue {
|
|
if verbose {
|
|
log.Printf(" processing range [%d, %d) of %d...", se.start, se.end, max)
|
|
}
|
|
if _, err := db.Exec(stmt, se.start, se.end); err != nil {
|
|
log.Printf(" range [%d, %d) error: %v", se.start, se.end, err)
|
|
}
|
|
}
|
|
}(queue)
|
|
}
|
|
|
|
for i := 1; i <= max; i += step {
|
|
queue <- startEnd{i, i + step}
|
|
}
|
|
close(queue)
|
|
wg.Wait()
|
|
|
|
return clearPrevoutMissTable(db)
|
|
}
|
|
|
|
func fixPrevoutTxIdAnalyze(db execer) error {
|
|
_, err := db.Exec(`ANALYZE txins, _prevout_miss, txs`)
|
|
return err
|
|
}
|
|
|
|
func createTables(db *sql.DB) error {
|
|
sqlTables := `
|
|
CREATE TABLE blocks (
|
|
id INT NOT NULL
|
|
,height INT NOT NULL -- not same as id, because orphans.
|
|
,hash BYTEA NOT NULL
|
|
,version INT NOT NULL
|
|
,prevhash BYTEA NOT NULL
|
|
,merkleroot BYTEA NOT NULL
|
|
,time INT NOT NULL
|
|
,bits INT NOT NULL
|
|
,nonce INT NOT NULL
|
|
,orphan BOOLEAN NOT NULL DEFAULT false
|
|
,size INT NOT NULL
|
|
,base_size INT NOT NULL
|
|
,weight INT NOT NULL
|
|
,virt_size INT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE txs (
|
|
id BIGINT NOT NULL
|
|
,txid BYTEA NOT NULL
|
|
,version INT NOT NULL
|
|
,locktime INT NOT NULL
|
|
,size INT NOT NULL
|
|
,base_size INT NOT NULL
|
|
,weight INT NOT NULL
|
|
,virt_size INT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE block_txs (
|
|
block_id INT NOT NULL
|
|
,n SMALLINT NOT NULL
|
|
,tx_id BIGINT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE txins (
|
|
tx_id BIGINT NOT NULL
|
|
,n SMALLINT NOT NULL
|
|
,prevout_tx_id BIGINT -- can be NULL for coinbase
|
|
,prevout_n SMALLINT NOT NULL
|
|
,scriptsig BYTEA NOT NULL
|
|
,sequence INT NOT NULL
|
|
,witness BYTEA
|
|
);
|
|
|
|
CREATE TABLE txouts (
|
|
tx_id BIGINT NOT NULL
|
|
,n SMALLINT NOT NULL
|
|
,value BIGINT NOT NULL
|
|
,scriptpubkey BYTEA NOT NULL
|
|
,spent BOOL NOT NULL
|
|
);
|
|
`
|
|
_, err := db.Exec(sqlTables)
|
|
return err
|
|
}
|
|
|
|
// The approach we are taking here is to disable autovacuum during the
|
|
// first import, then enabling it at the end and let it run. Because of hint
|
|
// bits, the vacuum process will effectively rewrite every page of the table, this is explained here
|
|
// https://dba.stackexchange.com/questions/130496/is-it-worth-it-to-run-vacuum-on-a-table-that-only-receives-inserts
|
|
// NB: Setting UNLOGGED/LOGGED does not work as changing this setting rewrites the tables
|
|
func setTableStorageParams(db *sql.DB) error {
|
|
for _, table := range []string{"blocks", "txs", "block_txs", "txins", "txouts"} {
|
|
stmt := fmt.Sprintf(`ALTER TABLE %s SET (autovacuum_enabled=false)`, table)
|
|
if _, err := db.Exec(stmt); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
func resetTableStorageParams(db *sql.DB) error {
|
|
for _, table := range []string{"blocks", "txs", "block_txs", "txins", "txouts"} {
|
|
stmt := fmt.Sprintf(`ALTER TABLE %s RESET (autovacuum_enabled)`, table)
|
|
if _, err := db.Exec(stmt); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func createPgcrypto(db *sql.DB) error {
|
|
_, err := db.Exec("CREATE EXTENSION IF NOT EXISTS pgcrypto")
|
|
return err
|
|
}
|
|
|
|
// Create indexes only necessary to run fixPrevoutTxId(). Since we are
|
|
// updating txins, a presense of the txins(addr_prefix(... index would
|
|
// slow the updates down a great deal (updates cause an index update,
|
|
// which calls the function).
|
|
func createIndexes1(db *sql.DB, verbose bool) error {
|
|
var start time.Time
|
|
if verbose {
|
|
log.Printf(" Starting txins primary key...")
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage
|
|
WHERE table_name = 'txins' AND constraint_name = 'txins_pkey') THEN
|
|
ALTER TABLE txins ADD CONSTRAINT txins_pkey PRIMARY KEY(tx_id, n);
|
|
END IF;
|
|
END
|
|
$$;`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting txs txid (hash) index...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS txs_txid_idx ON txs(txid);"); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s.", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func createIndexes2(db *sql.DB, verbose bool) error {
|
|
var start time.Time
|
|
// Adding a constraint or index if it does not exist is a little tricky in PG
|
|
if verbose {
|
|
log.Printf(" Starting blocks primary key...")
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage
|
|
WHERE table_name = 'blocks' AND constraint_name = 'blocks_pkey') THEN
|
|
ALTER TABLE blocks ADD CONSTRAINT blocks_pkey PRIMARY KEY(id);
|
|
END IF;
|
|
END
|
|
$$;`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting blocks prevhash index...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec("CREATE INDEX IF NOT EXISTS blocks_prevhash_idx ON blocks(prevhash);"); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting blocks hash index...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS blocks_hash_idx ON blocks(hash);"); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting blocks height index...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec("CREATE INDEX IF NOT EXISTS blocks_height_idx ON blocks(height);"); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting txs primary key...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage
|
|
WHERE table_name = 'txs' AND constraint_name = 'txs_pkey') THEN
|
|
ALTER TABLE txs ADD CONSTRAINT txs_pkey PRIMARY KEY(id);
|
|
END IF;
|
|
END
|
|
$$;`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting block_txs block_id, n primary key...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage
|
|
WHERE table_name = 'block_txs' AND constraint_name = 'block_txs_pkey') THEN
|
|
ALTER TABLE block_txs ADD CONSTRAINT block_txs_pkey PRIMARY KEY(block_id, n);
|
|
END IF;
|
|
END
|
|
$$;`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting block_txs tx_id index...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec("CREATE INDEX IF NOT EXISTS block_txs_tx_id_idx ON block_txs(tx_id);"); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Creatng hash_type function...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
CREATE OR REPLACE FUNCTION hash_type(_hash BYTEA) RETURNS TEXT AS $$
|
|
BEGIN
|
|
IF EXISTS (SELECT 1 FROM blocks WHERE hash = _hash) THEN
|
|
RETURN 'block';
|
|
ELSIF EXISTS (SELECT 1 FROM txs WHERE txid = _hash) THEN
|
|
RETURN 'tx';
|
|
END IF;
|
|
RETURN NULL;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting txins (prevout_tx_id, prevout_tx_n) index...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec("CREATE INDEX IF NOT EXISTS txins_prevout_tx_id_prevout_n_idx ON txins(prevout_tx_id, prevout_n);"); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting txouts primary key...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage
|
|
WHERE table_name = 'txouts' AND constraint_name = 'txouts_pkey') THEN
|
|
ALTER TABLE txouts ADD CONSTRAINT txouts_pkey PRIMARY KEY(tx_id, n);
|
|
END IF;
|
|
END
|
|
$$;`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting txouts address prefix index...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
CREATE OR REPLACE FUNCTION extract_address(scriptPubKey BYTEA) RETURNS BYTEA AS $$
|
|
BEGIN
|
|
IF SUBSTR(scriptPubKey, 1, 3) = E'\\x76a914' THEN -- P2PKH
|
|
RETURN SUBSTR(scriptPubKey, 4, 20);
|
|
ELSIF SUBSTR(scriptPubKey, 1, 2) = E'\\xa914' THEN -- P2SH
|
|
RETURN SUBSTR(scriptPubKey, 3, 20);
|
|
ELSIF SUBSTR(scriptPubKey, 1, 2) = E'\\x0014' THEN -- P2WPKH
|
|
RETURN SUBSTR(scriptPubKey, 3, 20);
|
|
ELSIF SUBSTR(scriptPubKey, 1, 2) = E'\\x0020' THEN -- P2WSH
|
|
RETURN SUBSTR(scriptPubKey, 3, 32);
|
|
END IF;
|
|
RETURN NULL;
|
|
END;
|
|
$$ LANGUAGE plpgsql IMMUTABLE;
|
|
|
|
-- Cast the first 8 bytes of a BYTEA as a BIGINT
|
|
CREATE OR REPLACE FUNCTION bytes2int8(bytes BYTEA) RETURNS BIGINT AS $$
|
|
BEGIN
|
|
RETURN SUBSTR(bytes::text, 2, 16)::bit(64)::bigint;
|
|
END;
|
|
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
|
|
|
|
-- Address prefix (txout)
|
|
CREATE OR REPLACE FUNCTION addr_prefix(scriptPubKey BYTEA) RETURNS BIGINT AS $$
|
|
BEGIN
|
|
RETURN public.bytes2int8(public.extract_address(scriptPubKey));
|
|
END;
|
|
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
|
|
|
|
CREATE INDEX IF NOT EXISTS txouts_addr_prefix_tx_id_idx ON txouts(addr_prefix(scriptpubkey), tx_id);
|
|
`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting txins address prefix index...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
CREATE OR REPLACE FUNCTION parse_witness(witness BYTEA) RETURNS BYTEA[] AS $$
|
|
DECLARE
|
|
stack BYTEA[];
|
|
len INT;
|
|
pos INT = 1;
|
|
slen INT;
|
|
BEGIN
|
|
IF witness IS NULL OR witness = '' THEN
|
|
RETURN NULL;
|
|
END IF;
|
|
len = GET_BYTE(witness, 0); -- this is a varint, but whatever
|
|
WHILE len > 0 LOOP
|
|
slen = GET_BYTE(witness, pos);
|
|
IF slen = 253 THEN
|
|
slen = GET_BYTE(witness, pos+1) + GET_BYTE(witness, pos+2)*256;
|
|
pos = pos+2;
|
|
-- NB: There is a possibility of a 4-byte compact, but no transaction uses it
|
|
END IF;
|
|
stack = stack || SUBSTR(witness, pos+2, slen);
|
|
pos = pos + slen + 1;
|
|
len = len - 1;
|
|
END LOOP;
|
|
RETURN stack;
|
|
END;
|
|
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
|
|
|
|
CREATE OR REPLACE FUNCTION extract_address(scriptsig BYTEA, witness BYTEA) RETURNS BYTEA AS $$
|
|
DECLARE
|
|
pub BYTEA;
|
|
sha BYTEA;
|
|
wits BYTEA[];
|
|
len INT;
|
|
pos INT;
|
|
op INT;
|
|
BEGIN
|
|
IF LENGTH(scriptsig) = 0 OR scriptsig IS NULL THEN -- Native SegWit: P2WSH or P2WPKH
|
|
wits = public.parse_witness(witness);
|
|
pub = wits[array_length(wits, 1)];
|
|
sha = public.digest(pub, 'sha256');
|
|
IF ARRAY_LENGTH(wits, 1) = 2 AND LENGTH(pub) = 33 THEN -- Most likely a P2WPKH
|
|
RETURN public.digest(sha, 'ripemd160');
|
|
ELSE -- Most likely a P2WSH
|
|
RETURN sha;
|
|
END IF;
|
|
ELSE
|
|
len = GET_BYTE(scriptsig, 0);
|
|
IF len = LENGTH(scriptsig) - 1 THEN -- Most likely a P2SH (or P2SH-P2W*)
|
|
RETURN public.digest(public.digest(SUBSTR(scriptsig, 2), 'sha256'), 'ripemd160');
|
|
ELSE -- P2PKH or longer P2SH, either way the last thing is what we need
|
|
pos = 0;
|
|
WHILE pos < LENGTH(scriptsig)-1 LOOP
|
|
op = GET_BYTE(scriptsig, pos);
|
|
IF op > 0 AND op < 76 THEN
|
|
len = op;
|
|
pos = pos + 1;
|
|
ELSEIF op = 76 THEN
|
|
len = GET_BYTE(scriptsig, pos+1);
|
|
pos = pos + 2;
|
|
ELSEIF op = 77 THEN
|
|
len = GET_BYTE(scriptsig, pos+1) + GET_BYTE(scriptsig, pos+2)*256;
|
|
pos = pos + 3;
|
|
ELSE
|
|
pos = pos + 1;
|
|
CONTINUE;
|
|
END IF;
|
|
pub = SUBSTR(scriptsig, pos+1, len);
|
|
pos = pos + len;
|
|
END LOOP;
|
|
RETURN public.digest(public.digest(pub, 'sha256'), 'ripemd160');
|
|
END IF;
|
|
END IF;
|
|
RETURN NULL;
|
|
END;
|
|
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
|
|
|
|
-- Address prefix (txin)
|
|
CREATE OR REPLACE FUNCTION addr_prefix(scriptsig BYTEA, witness BYTEA) RETURNS BIGINT AS $$
|
|
BEGIN
|
|
RETURN public.bytes2int8(public.extract_address(scriptsig, witness));
|
|
END;
|
|
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
|
|
|
|
-- Partial/conditional index because coinbase txin scriptsigs are garbage
|
|
CREATE INDEX IF NOT EXISTS txins_addr_prefix_tx_id_idx ON txins(addr_prefix(scriptsig, witness), tx_id)
|
|
WHERE prevout_tx_id IS NOT NULL;
|
|
`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s.", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func createConstraints(db *sql.DB, verbose bool) error {
|
|
var start time.Time
|
|
if verbose {
|
|
log.Printf(" Starting block_txs block_id foreign key...")
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
DO $$
|
|
BEGIN
|
|
-- NB: table_name is the target/foreign table
|
|
IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage
|
|
WHERE table_name = 'blocks' AND constraint_name = 'block_txs_block_id_fkey') THEN
|
|
ALTER TABLE block_txs ADD CONSTRAINT block_txs_block_id_fkey FOREIGN KEY (block_id) REFERENCES blocks(id) ON DELETE CASCADE NOT VALID;
|
|
END IF;
|
|
END
|
|
$$;`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting block_txs tx_id foreign key...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
DO $$
|
|
BEGIN
|
|
-- NB: table_name is the target/foreign table
|
|
IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage
|
|
WHERE table_name = 'txs' AND constraint_name = 'block_txs_tx_id_fkey') THEN
|
|
ALTER TABLE block_txs ADD CONSTRAINT block_txs_tx_id_fkey FOREIGN KEY (tx_id) REFERENCES txs(id) ON DELETE CASCADE NOT VALID DEFERRABLE;
|
|
END IF;
|
|
END
|
|
$$;`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting txins tx_id foreign key...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
DO $$
|
|
BEGIN
|
|
-- NB: table_name is the target/foreign table
|
|
IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage
|
|
WHERE table_name = 'txs' AND constraint_name = 'txins_tx_id_fkey') THEN
|
|
ALTER TABLE txins ADD CONSTRAINT txins_tx_id_fkey FOREIGN KEY (tx_id) REFERENCES txs(id) ON DELETE CASCADE NOT VALID DEFERRABLE;
|
|
END IF;
|
|
END
|
|
$$;`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s. Starting txouts tx_id foreign key...", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
start = time.Now()
|
|
if _, err := db.Exec(`
|
|
DO $$
|
|
BEGIN
|
|
-- NB: table_name is the target/foreign table
|
|
IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage
|
|
WHERE table_name = 'txs' AND constraint_name = 'txouts_tx_id_fkey') THEN
|
|
ALTER TABLE txouts ADD CONSTRAINT txouts_tx_id_fkey FOREIGN KEY (tx_id) REFERENCES txs(id) ON DELETE CASCADE NOT VALID DEFERRABLE;
|
|
END IF;
|
|
END
|
|
$$;`); err != nil {
|
|
return err
|
|
}
|
|
if verbose {
|
|
log.Printf(" ...done in %s.", time.Now().Sub(start).Round(time.Millisecond))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Set the orphan status starting from the highest block and going
|
|
// backwards, up to limit. If limit is 0, the whole table is updated.
|
|
//
|
|
// The WITH RECURSIVE part connects rows by joining prevhash to hash,
|
|
// thereby building a list which starts at the highest hight and going
|
|
// towards the beginning until no parent can be found.
|
|
//
|
|
// Then we LEFT JOIN the above to the blocks table, and where there is
|
|
// no match (x.id IS NULL) we mark it as orphan.
|
|
//
|
|
// If the chain is split, i.e. there is more than one row at the
|
|
// highest height, then no blocks in the split will be marked as
|
|
// orphan, which is fine.
|
|
|
|
func (w *PGWriter) SetOrphans(limit int) error {
|
|
var limitNSql string
|
|
if limit > 0 {
|
|
limitNSql = fmt.Sprintf("WHERE n < %d", limit+50)
|
|
}
|
|
if _, err := w.db.Exec(fmt.Sprintf(`
|
|
DO $$
|
|
DECLARE
|
|
min_id INT;
|
|
BEGIN
|
|
-- select an id going back limit rows.
|
|
SELECT MIN(id) INTO min_id FROM (SELECT id FROM blocks ORDER BY id DESC LIMIT %d) x;
|
|
-- a limit of 0 leaves min_id as NULL, need to correct that
|
|
IF (min_id IS NULL) THEN
|
|
min_id = -1;
|
|
END IF;
|
|
-- the key idea is that the recursive part goes back further than the
|
|
-- limit imposed by min_id
|
|
UPDATE blocks
|
|
SET orphan = a.orphan
|
|
FROM (
|
|
SELECT blocks.id, x.id IS NULL AS orphan
|
|
FROM blocks
|
|
LEFT JOIN (
|
|
WITH RECURSIVE recur(id, prevhash, n) AS (
|
|
-- non-recursive term, executed once
|
|
SELECT id, prevhash, 0 AS n
|
|
FROM blocks
|
|
-- this should be faster than MAX(height)
|
|
WHERE height IN (SELECT height FROM blocks ORDER BY height DESC LIMIT 1)
|
|
UNION ALL
|
|
-- recursive term, recur refers to previous iteration result
|
|
-- iteration stops when previous row prevhash finds no match OR
|
|
-- if n reaches a limit (see limitSql above)
|
|
SELECT blocks.id, blocks.prevhash, n+1 AS n
|
|
FROM recur
|
|
JOIN blocks ON blocks.hash = recur.prevhash
|
|
%s
|
|
)
|
|
SELECT recur.id, recur.prevhash
|
|
FROM recur
|
|
) x ON blocks.id = x.id
|
|
) a
|
|
WHERE blocks.id = a.id AND blocks.id >= min_id;
|
|
END
|
|
$$`, limit, limitNSql)); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func createTxinsTriggers(db *sql.DB) error {
|
|
if _, err := db.Exec(`
|
|
|
|
CREATE OR REPLACE FUNCTION txins_after_trigger_func() RETURNS TRIGGER AS $$
|
|
BEGIN
|
|
IF (TG_OP = 'DELETE') THEN
|
|
IF OLD.prevout_tx_id IS NOT NULL THEN
|
|
UPDATE txouts SET spent = FALSE
|
|
WHERE tx_id = OLD.prevout_tx_id AND n = OLD.prevout_n;
|
|
END IF;
|
|
RETURN OLD;
|
|
ELSIF (TG_OP = 'UPDATE' OR TG_OP = 'INSERT') THEN
|
|
IF NEW.prevout_tx_id IS NOT NULL THEN
|
|
UPDATE txouts SET spent = TRUE
|
|
WHERE tx_id = NEW.prevout_tx_id AND n = NEW.prevout_n;
|
|
IF NOT FOUND THEN
|
|
RAISE EXCEPTION 'Unknown tx_id:prevout_n combination: %:%', NEW.prevout_tx_id, NEW.prevout_n;
|
|
END IF;
|
|
END IF;
|
|
RETURN NEW;
|
|
END IF;
|
|
RETURN NULL;
|
|
END;
|
|
$$ LANGUAGE plpgsql PARALLEL SAFE;
|
|
|
|
CREATE CONSTRAINT TRIGGER txins_after_trigger
|
|
AFTER INSERT OR UPDATE OR DELETE ON txins DEFERRABLE INITIALLY DEFERRED
|
|
FOR EACH ROW EXECUTE PROCEDURE txins_after_trigger_func();
|
|
|
|
`); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func warmupCache(db *sql.DB, cache *txIdCache, blocks int) error {
|
|
stmt := `
|
|
SELECT t.id, t.txid, o.cnt
|
|
FROM txs t
|
|
JOIN LATERAL (
|
|
SELECT COUNT(1) AS cnt
|
|
FROM txouts o
|
|
WHERE o.tx_id = t.id
|
|
) o ON true
|
|
WHERE id > (
|
|
SELECT MIN(tx_id) FROM block_txs bt
|
|
JOIN (
|
|
SELECT id FROM blocks ORDER BY id DESC LIMIT $1
|
|
) b ON b.id = bt.block_id
|
|
)
|
|
ORDER BY t.id;
|
|
`
|
|
rows, err := db.Query(stmt, blocks)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
var (
|
|
txid int64
|
|
hash blkchain.Uint256
|
|
cnt int
|
|
)
|
|
for rows.Next() {
|
|
if err := rows.Scan(&txid, &hash, &cnt); err != nil {
|
|
return err
|
|
}
|
|
cache.add(hash, txid, cnt)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func takeSnapshot(db *sql.DB, dataset string, height int, tag string) error {
|
|
log.Printf("Taking a ZFS snapshot: %s@%d%s", dataset, height, tag)
|
|
log.Printf(" calling pg_backup_start('blkchain', true)") // pg < 15 use pg_start_backup
|
|
if _, err := db.Exec(`SELECT pg_backupstart('blkchain', true)`); err != nil {
|
|
return err
|
|
}
|
|
log.Printf(" executing COPY (SELECT) TO PROGRAM 'zfs snapshot %s@%d%s'", dataset, height, tag)
|
|
if _, err := db.Exec(fmt.Sprintf(`COPY (SELECT) TO PROGRAM 'zfs snapshot %s@%d%s'`, dataset, height, tag)); err != nil {
|
|
return err
|
|
}
|
|
log.Printf(" calling pg_backup_stop()") // pg < 15 use pg_stop_backup
|
|
if _, err := db.Exec(`SELECT pg_backup_stop()`); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TODO: Presently unused
|
|
func waitForAutovacuum(db *sql.DB) error {
|
|
avCount := func() (int, error) {
|
|
stmt := `SELECT COUNT(1) FROM pg_stat_activity WHERE query LIKE 'autovacuum:%'`
|
|
rows, err := db.Query(stmt)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer rows.Close()
|
|
var cnt int
|
|
for rows.Next() {
|
|
if err := rows.Scan(&cnt); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
return cnt, nil
|
|
}
|
|
for {
|
|
if cnt, err := avCount(); err != nil {
|
|
log.Printf("waitForAutovacuum: %v", err)
|
|
return err
|
|
} else if cnt == 0 {
|
|
return nil
|
|
}
|
|
time.Sleep(3 * time.Second)
|
|
}
|
|
}
|