go-vise

Constrained Size Output Virtual Machine
Info | Log | Files | Refs | README | LICENSE

pg.go (2929B)


      1 package postgres
      2 
      3 import (
      4 	"context"
      5 	"errors"
      6 	"fmt"
      7 
      8 	"github.com/jackc/pgx/v5/pgxpool"
      9 
     10 	"git.defalsify.org/vise.git/db"
     11 )
     12 
     13 // pgDb is a Postgres backend implementation of the Db interface.
     14 type pgDb struct {
     15 	*db.DbBase
     16 	conn *pgxpool.Pool
     17 	schema string
     18 	prefix uint8
     19 }
     20 
     21 // NewpgDb creates a new Postgres backed Db implementation.
     22 func NewPgDb() *pgDb {
     23 	db := &pgDb{
     24 		DbBase: db.NewDbBase(),
     25 		schema: "public",
     26 	}
     27 	return db
     28 }
     29 
     30 // WithSchema sets the Postgres schema to use for the storage table.
     31 func(pdb *pgDb) WithSchema(schema string) *pgDb {
     32 	pdb.schema = schema
     33 	return pdb
     34 }
     35 
     36 // Connect implements Db.
     37 func(pdb *pgDb) Connect(ctx context.Context, connStr string) error {
     38 	if pdb.conn != nil {
     39 		logg.WarnCtxf(ctx, "already connected", "conn", pdb.conn)
     40 		panic("already connected")
     41 	}
     42 	var err error
     43 	conn, err := pgxpool.New(ctx, connStr)
     44 	if err != nil {
     45 		return err
     46 	}
     47 	pdb.conn = conn
     48 	return pdb.prepare(ctx)
     49 }
     50 
     51 // Put implements Db.
     52 func(pdb *pgDb) Put(ctx context.Context, key []byte, val []byte) error {
     53 	if !pdb.CheckPut() {
     54 		return errors.New("unsafe put and safety set")
     55 	}
     56 	k, err := pdb.ToKey(ctx, key)
     57 	if err != nil {
     58 		return err
     59 	}
     60 	tx, err := pdb.conn.Begin(ctx)
     61 	if err != nil {
     62 		return err
     63 	}
     64 	query := fmt.Sprintf("INSERT INTO %s.kv_vise (key, value) VALUES ($1, $2) ON CONFLICT(key) DO UPDATE SET value = $2;", pdb.schema)
     65 	_, err = tx.Exec(ctx, query, k, val)
     66 	if err != nil {
     67 		tx.Rollback(ctx)
     68 		return err
     69 	}
     70 	tx.Commit(ctx)
     71 	return nil
     72 }
     73 
     74 // Get implements Db.
     75 func(pdb *pgDb) Get(ctx context.Context, key []byte) ([]byte, error) {
     76 	lk, err := pdb.ToKey(ctx, key)
     77 	if err != nil {
     78 		return nil, err
     79 	}
     80 	if lk.Translation != nil {
     81 		tx, err := pdb.conn.Begin(ctx)
     82 		if err != nil {
     83 			return nil, err
     84 		}
     85 		query := fmt.Sprintf("SELECT value FROM %s.kv_vise WHERE key = $1", pdb.schema)
     86 		rs, err := tx.Query(ctx, query, lk.Translation)
     87 		if err != nil {
     88 			return nil, err
     89 		}
     90 		defer rs.Close()
     91 		if rs.Next() {
     92 			r := rs.RawValues()
     93 			return r[0], nil
     94 		}
     95 	}
     96 
     97 	tx, err := pdb.conn.Begin(ctx)
     98 	if err != nil {
     99 		return nil, err
    100 	}
    101 	query := fmt.Sprintf("SELECT value FROM %s.kv_vise WHERE key = $1", pdb.schema)
    102 	rs, err := tx.Query(ctx, query, lk.Translation)
    103 	if err != nil {
    104 		return nil, err
    105 	}
    106 	defer rs.Close()
    107 	if !rs.Next() {
    108 		return nil, db.NewErrNotFound(key)
    109 	}
    110 	r := rs.RawValues()
    111 	return r[0], nil
    112 }
    113 
    114 // Close implements Db.
    115 func(pdb *pgDb) Close() error {
    116 	pdb.Close()
    117 	return nil
    118 }
    119 
    120 // set up table
    121 func(pdb *pgDb) prepare(ctx context.Context) error {
    122 	tx, err := pdb.conn.Begin(ctx)
    123 	if err != nil {
    124 		tx.Rollback(ctx)
    125 		return err
    126 	}
    127 	query := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.kv_vise (
    128 		id SERIAL NOT NULL,
    129 		key BYTEA NOT NULL UNIQUE,
    130 		value BYTEA NOT NULL
    131 	);
    132 `, pdb.schema)
    133 	_, err = tx.Exec(ctx, query)
    134 	if err != nil {
    135 		tx.Rollback(ctx)
    136 		return err
    137 	}
    138 
    139 	err = tx.Commit(ctx)
    140 	if err != nil {
    141 		tx.Rollback(ctx)
    142 		return err
    143 	}
    144 	return nil
    145 }