pg.go (2929B)
1 package postgres 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 8 "github.com/jackc/pgx/v5/pgxpool" 9 10 "git.defalsify.org/vise.git/db" 11 ) 12 13 // pgDb is a Postgres backend implementation of the Db interface. 14 type pgDb struct { 15 *db.DbBase 16 conn *pgxpool.Pool 17 schema string 18 prefix uint8 19 } 20 21 // NewpgDb creates a new Postgres backed Db implementation. 22 func NewPgDb() *pgDb { 23 db := &pgDb{ 24 DbBase: db.NewDbBase(), 25 schema: "public", 26 } 27 return db 28 } 29 30 // WithSchema sets the Postgres schema to use for the storage table. 31 func(pdb *pgDb) WithSchema(schema string) *pgDb { 32 pdb.schema = schema 33 return pdb 34 } 35 36 // Connect implements Db. 37 func(pdb *pgDb) Connect(ctx context.Context, connStr string) error { 38 if pdb.conn != nil { 39 logg.WarnCtxf(ctx, "already connected", "conn", pdb.conn) 40 panic("already connected") 41 } 42 var err error 43 conn, err := pgxpool.New(ctx, connStr) 44 if err != nil { 45 return err 46 } 47 pdb.conn = conn 48 return pdb.prepare(ctx) 49 } 50 51 // Put implements Db. 52 func(pdb *pgDb) Put(ctx context.Context, key []byte, val []byte) error { 53 if !pdb.CheckPut() { 54 return errors.New("unsafe put and safety set") 55 } 56 k, err := pdb.ToKey(ctx, key) 57 if err != nil { 58 return err 59 } 60 tx, err := pdb.conn.Begin(ctx) 61 if err != nil { 62 return err 63 } 64 query := fmt.Sprintf("INSERT INTO %s.kv_vise (key, value) VALUES ($1, $2) ON CONFLICT(key) DO UPDATE SET value = $2;", pdb.schema) 65 _, err = tx.Exec(ctx, query, k, val) 66 if err != nil { 67 tx.Rollback(ctx) 68 return err 69 } 70 tx.Commit(ctx) 71 return nil 72 } 73 74 // Get implements Db. 75 func(pdb *pgDb) Get(ctx context.Context, key []byte) ([]byte, error) { 76 lk, err := pdb.ToKey(ctx, key) 77 if err != nil { 78 return nil, err 79 } 80 if lk.Translation != nil { 81 tx, err := pdb.conn.Begin(ctx) 82 if err != nil { 83 return nil, err 84 } 85 query := fmt.Sprintf("SELECT value FROM %s.kv_vise WHERE key = $1", pdb.schema) 86 rs, err := tx.Query(ctx, query, lk.Translation) 87 if err != nil { 88 return nil, err 89 } 90 defer rs.Close() 91 if rs.Next() { 92 r := rs.RawValues() 93 return r[0], nil 94 } 95 } 96 97 tx, err := pdb.conn.Begin(ctx) 98 if err != nil { 99 return nil, err 100 } 101 query := fmt.Sprintf("SELECT value FROM %s.kv_vise WHERE key = $1", pdb.schema) 102 rs, err := tx.Query(ctx, query, lk.Translation) 103 if err != nil { 104 return nil, err 105 } 106 defer rs.Close() 107 if !rs.Next() { 108 return nil, db.NewErrNotFound(key) 109 } 110 r := rs.RawValues() 111 return r[0], nil 112 } 113 114 // Close implements Db. 115 func(pdb *pgDb) Close() error { 116 pdb.Close() 117 return nil 118 } 119 120 // set up table 121 func(pdb *pgDb) prepare(ctx context.Context) error { 122 tx, err := pdb.conn.Begin(ctx) 123 if err != nil { 124 tx.Rollback(ctx) 125 return err 126 } 127 query := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.kv_vise ( 128 id SERIAL NOT NULL, 129 key BYTEA NOT NULL UNIQUE, 130 value BYTEA NOT NULL 131 ); 132 `, pdb.schema) 133 _, err = tx.Exec(ctx, query) 134 if err != nil { 135 tx.Rollback(ctx) 136 return err 137 } 138 139 err = tx.Commit(ctx) 140 if err != nil { 141 tx.Rollback(ctx) 142 return err 143 } 144 return nil 145 }