// pgm // // A simple PG string query builder // // Author: Ankit Patial package pgm import ( "context" "errors" "fmt" "log/slog" "strings" "sync/atomic" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" ) var ( poolPGX atomic.Pointer[pgxpool.Pool] ErrConnStringMissing = errors.New("connection string is empty") ) // Errors var ( ErrInitTX = errors.New("failed to init db.tx") ErrCommitTX = errors.New("failed to commit db.tx") ErrNoRows = errors.New("no data found") ) type Config struct { ConnString string MaxConns int32 MinConns int32 MaxConnLifetime time.Duration MaxConnIdleTime time.Duration } // InitPool will create new pgxpool.Pool and will keep it for its working func InitPool(conf Config) { if conf.ConnString == "" { panic(ErrConnStringMissing) } // Validate configuration if conf.MaxConns > 0 && conf.MinConns > 0 && conf.MinConns > conf.MaxConns { panic(fmt.Errorf("MinConns (%d) cannot be greater than MaxConns (%d)", conf.MinConns, conf.MaxConns)) } if conf.MaxConns < 0 || conf.MinConns < 0 { panic(errors.New("connection pool configuration cannot have negative values")) } cfg, err := pgxpool.ParseConfig(conf.ConnString) if err != nil { panic(err) } if conf.MaxConns > 0 { cfg.MaxConns = conf.MaxConns // 100 } if conf.MinConns > 0 { cfg.MinConns = conf.MinConns // 5 } if conf.MaxConnLifetime > 0 { cfg.MaxConnLifetime = conf.MaxConnLifetime // time.Minute * 10 } if conf.MaxConnIdleTime > 0 { cfg.MaxConnIdleTime = conf.MaxConnIdleTime // time.Minute * 5 } p, err := pgxpool.NewWithConfig(context.Background(), cfg) if err != nil { panic(err) } if err = p.Ping(context.Background()); err != nil { panic(err) } poolPGX.Store(p) } // GetPool instance func GetPool() *pgxpool.Pool { return poolPGX.Load() } // ClosePool closes the connection pool gracefully. // Should be called during application shutdown. func ClosePool() { if p := poolPGX.Load(); p != nil { p.Close() poolPGX.Store(nil) } } // BeginTx begins a pgx poll transaction func BeginTx(ctx context.Context) (pgx.Tx, error) { tx, err := poolPGX.Load().Begin(ctx) if err != nil { slog.Error("failed to begin transaction", "error", err) return nil, fmt.Errorf("failed to open db tx: %w", err) } return tx, nil } // IsNotFound error check func IsNotFound(err error) bool { return errors.Is(err, pgx.ErrNoRows) } // PgTime as in UTC func PgTime(t time.Time) pgtype.Timestamptz { return pgtype.Timestamptz{Time: t, Valid: true} } func PgTimeNow() pgtype.Timestamptz { return pgtype.Timestamptz{Time: time.Now(), Valid: true} } func TsAndQuery(q string) string { return strings.Join(strings.Fields(q), " & ") } func TsPrefixAndQuery(q string) string { return strings.Join(fieldsWithSufix(q, ":*"), " & ") } func TsOrQuery(q string) string { return strings.Join(strings.Fields(q), " | ") } func TsPrefixOrQuery(q string) string { return strings.Join(fieldsWithSufix(q, ":*"), " | ") } func fieldsWithSufix(v, sufix string) []string { fields := strings.Fields(v) prefixed := make([]string, len(fields)) for i, f := range fields { prefixed[i] = f + sufix } return prefixed }