dump.go (1438B)
1 package postgres 2 3 import ( 4 "context" 5 "fmt" 6 7 "git.defalsify.org/vise.git/db" 8 ) 9 10 func (pdb *pgDb) Dump(ctx context.Context, key []byte) (*db.Dumper, error) { 11 tx, err := pdb.conn.BeginTx(ctx, defaultTxOptions) 12 if err != nil { 13 return nil, err 14 } 15 16 pdb.SetLanguage(nil) 17 lk, err := pdb.ToKey(ctx, key) 18 if err != nil { 19 return nil, err 20 } 21 k := lk.Default 22 23 query := fmt.Sprintf("SELECT key, value FROM %s.kv_vise WHERE key >= $1", pdb.schema) 24 rs, err := tx.Query(ctx, query, k) 25 if err != nil { 26 logg.Debugf("query fail", "err", err) 27 tx.Rollback(ctx) 28 return nil, err 29 } 30 defer tx.Commit(ctx) 31 32 if rs.Next() { 33 var kk []byte 34 var vv []byte 35 err = rs.Scan(&kk, &vv) 36 if err != nil { 37 return nil, err 38 } 39 pdb.it = rs 40 pdb.itBase = k 41 kk, err = pdb.DecodeKey(ctx, kk) 42 if err != nil { 43 return nil, err 44 } 45 return db.NewDumper(pdb.dumpFunc).WithClose(pdb.closeFunc).WithFirst(kk, vv), nil 46 } 47 48 return nil, db.NewErrNotFound(k) 49 } 50 51 func (pdb *pgDb) dumpFunc(ctx context.Context) ([]byte, []byte) { 52 var kk []byte 53 var vv []byte 54 if !pdb.it.Next() { 55 logg.DebugCtxf(ctx, "no more data in pg iterator") 56 pdb.it = nil 57 pdb.itBase = nil 58 return nil, nil 59 } 60 err := pdb.it.Scan(&kk, &vv) 61 if err != nil { 62 return nil, nil 63 } 64 k, err := pdb.DecodeKey(ctx, kk) 65 if err != nil { 66 return nil, nil 67 } 68 return k, vv 69 } 70 71 func (pdb *pgDb) closeFunc() error { 72 if pdb.it != nil { 73 pdb.it.Close() 74 pdb.it = nil 75 } 76 return nil 77 }