198 lines
5.1 KiB
Go
198 lines
5.1 KiB
Go
// pgm
|
|
//
|
|
// A simple PG string query builder
|
|
//
|
|
// Author: Ankit Patial
|
|
|
|
package pgm
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
var (
|
|
poolPGX atomic.Pointer[pgxpool.Pool]
|
|
ErrConnStringMissing = errors.New("connection string is empty")
|
|
)
|
|
|
|
// Common errors returned by pgm operations
|
|
var (
|
|
ErrInitTX = errors.New("failed to init db.tx")
|
|
ErrCommitTX = errors.New("failed to commit db.tx")
|
|
ErrNoRows = errors.New("no data found")
|
|
)
|
|
|
|
// Config holds the configuration for initializing the connection pool.
|
|
// All fields except ConnString are optional and will use pgx defaults if not set.
|
|
type Config struct {
|
|
ConnString string
|
|
MaxConns int32
|
|
MinConns int32
|
|
MaxConnLifetime time.Duration
|
|
MaxConnIdleTime time.Duration
|
|
}
|
|
|
|
// InitPool initializes the connection pool with the provided configuration.
|
|
// It validates the configuration and panics if invalid.
|
|
// This function should be called once at application startup.
|
|
//
|
|
// Example:
|
|
//
|
|
// pgm.InitPool(pgm.Config{
|
|
// ConnString: "postgres://user:pass@localhost/dbname",
|
|
// MaxConns: 100,
|
|
// MinConns: 5,
|
|
// })
|
|
func InitPool(conf Config) {
|
|
if conf.ConnString == "" {
|
|
panic(ErrConnStringMissing)
|
|
}
|
|
|
|
// Validate configuration
|
|
if conf.MaxConns > 0 && conf.MinConns > 0 && conf.MinConns > conf.MaxConns {
|
|
panic(fmt.Errorf("MinConns (%d) cannot be greater than MaxConns (%d)", conf.MinConns, conf.MaxConns))
|
|
}
|
|
|
|
if conf.MaxConns < 0 || conf.MinConns < 0 {
|
|
panic(errors.New("connection pool configuration cannot have negative values"))
|
|
}
|
|
|
|
cfg, err := pgxpool.ParseConfig(conf.ConnString)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if conf.MaxConns > 0 {
|
|
cfg.MaxConns = conf.MaxConns // 100
|
|
}
|
|
|
|
if conf.MinConns > 0 {
|
|
cfg.MinConns = conf.MinConns // 5
|
|
}
|
|
|
|
if conf.MaxConnLifetime > 0 {
|
|
cfg.MaxConnLifetime = conf.MaxConnLifetime // time.Minute * 10
|
|
}
|
|
|
|
if conf.MaxConnIdleTime > 0 {
|
|
cfg.MaxConnIdleTime = conf.MaxConnIdleTime // time.Minute * 5
|
|
}
|
|
|
|
p, err := pgxpool.NewWithConfig(context.Background(), cfg)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err = p.Ping(context.Background()); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
poolPGX.Store(p)
|
|
}
|
|
|
|
// GetPool returns the initialized connection pool instance.
|
|
// It panics with a descriptive message if InitPool() has not been called.
|
|
// This is a fail-fast approach to catch programming errors early.
|
|
func GetPool() *pgxpool.Pool {
|
|
p := poolPGX.Load()
|
|
if p == nil {
|
|
panic("pgm: connection pool not initialized, call InitPool() first")
|
|
}
|
|
return p
|
|
}
|
|
|
|
// ClosePool closes the connection pool gracefully.
|
|
// Should be called during application shutdown.
|
|
func ClosePool() {
|
|
if p := poolPGX.Load(); p != nil {
|
|
p.Close()
|
|
poolPGX.Store(nil)
|
|
}
|
|
}
|
|
|
|
// BeginTx begins a new database transaction from the connection pool.
|
|
// Returns an error if the transaction cannot be started.
|
|
// Remember to commit or rollback the transaction when done.
|
|
//
|
|
// Example:
|
|
//
|
|
// tx, err := pgm.BeginTx(ctx)
|
|
// if err != nil {
|
|
// return err
|
|
// }
|
|
// defer tx.Rollback(ctx) // rollback on error
|
|
//
|
|
// // ... do work ...
|
|
//
|
|
// return tx.Commit(ctx)
|
|
func BeginTx(ctx context.Context) (pgx.Tx, error) {
|
|
tx, err := poolPGX.Load().Begin(ctx)
|
|
if err != nil {
|
|
slog.Error("failed to begin transaction", "error", err)
|
|
return nil, fmt.Errorf("failed to open db tx: %w", err)
|
|
}
|
|
|
|
return tx, nil
|
|
}
|
|
|
|
// IsNotFound checks if an error is a "no rows" error from pgx.
|
|
// Returns true if the error indicates no rows were found in a query result.
|
|
func IsNotFound(err error) bool {
|
|
return errors.Is(err, pgx.ErrNoRows)
|
|
}
|
|
|
|
// PgTime converts a Go time.Time to PostgreSQL timestamptz type.
|
|
// The time is stored as-is (preserves timezone information).
|
|
func PgTime(t time.Time) pgtype.Timestamptz {
|
|
return pgtype.Timestamptz{Time: t, Valid: true}
|
|
}
|
|
|
|
// PgTimeNow returns the current time as PostgreSQL timestamptz type.
|
|
func PgTimeNow() pgtype.Timestamptz {
|
|
return pgtype.Timestamptz{Time: time.Now(), Valid: true}
|
|
}
|
|
|
|
// TsAndQuery converts a text search query to use AND operator between terms.
|
|
// Example: "hello world" becomes "hello & world"
|
|
func TsAndQuery(q string) string {
|
|
return strings.Join(strings.Fields(q), " & ")
|
|
}
|
|
|
|
// TsPrefixAndQuery converts a text search query to use AND operator with prefix matching.
|
|
// Example: "hello world" becomes "hello:* & world:*"
|
|
func TsPrefixAndQuery(q string) string {
|
|
return strings.Join(fieldsWithSufix(q, ":*"), " & ")
|
|
}
|
|
|
|
// TsOrQuery converts a text search query to use OR operator between terms.
|
|
// Example: "hello world" becomes "hello | world"
|
|
func TsOrQuery(q string) string {
|
|
return strings.Join(strings.Fields(q), " | ")
|
|
}
|
|
|
|
// TsPrefixOrQuery converts a text search query to use OR operator with prefix matching.
|
|
// Example: "hello world" becomes "hello:* | world:*"
|
|
func TsPrefixOrQuery(q string) string {
|
|
return strings.Join(fieldsWithSufix(q, ":*"), " | ")
|
|
}
|
|
|
|
func fieldsWithSufix(v, sufix string) []string {
|
|
fields := strings.Fields(v)
|
|
prefixed := make([]string, len(fields))
|
|
for i, f := range fields {
|
|
prefixed[i] = f + sufix
|
|
}
|
|
|
|
return prefixed
|
|
}
|