Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/go-tpc/ch_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ func registerCHBenchmark(root *cobra.Command) {
"execute explain analyze")

cmdRun.PersistentFlags().IntSliceVar(&tpccConfig.Weight, "weight", []int{45, 43, 4, 4, 4}, "Weight for NewOrder, Payment, OrderStatus, Delivery, StockLevel")
cmdRun.Flags().DurationVar(&tpccConfig.ConnRefreshInterval, "conn-refresh-interval", 0, "automatically refresh database connections at specified intervals to balance traffic across new replicas (0 = disabled, e.g., 10s)")
cmdRun.Flags().StringVar(&apConnParams, "ap-conn-params", "", "Connection parameters for analytical processing")
cmdRun.Flags().StringSliceVar(&apHosts, "ap-host", nil, "Database host for analytical processing")
cmdRun.Flags().IntSliceVar(&apPorts, "ap-port", nil, "Database port for analytical processing")

cmd.AddCommand(cmdRun, cmdPrepare)
root.AddCommand(cmd)
}
Expand Down Expand Up @@ -143,6 +145,13 @@ func executeCH(action string, openAP func() (*sql.DB, error)) {
tp, ap workload.Workloader
err error
)

// Set a reasonable connection max lifetime when auto-refresh is enabled
// This ensures connections are actually closed and not just returned to pool
if tpccConfig.ConnRefreshInterval > 0 {
globalDB.SetConnMaxLifetime(tpccConfig.ConnRefreshInterval)
fmt.Printf("Auto-setting connection max lifetime to %v (refresh interval)\n", tpccConfig.ConnRefreshInterval)
}
tp, err = tpcc.NewWorkloader(globalDB, &tpccConfig)
if err != nil {
fmt.Printf("Failed to init tp work loader: %v\n", err)
Expand Down
8 changes: 8 additions & 0 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func executeTpcc(action string) {
}
w, err = tpcc.NewCSVWorkloader(globalDB, &tpccConfig)
default:
// Set a reasonable connection max lifetime when auto-refresh is enabled
// This ensures connections are actually closed and not just returned to pool
if tpccConfig.ConnRefreshInterval > 0 {
globalDB.SetConnMaxLifetime(tpccConfig.ConnRefreshInterval)
fmt.Printf("Auto-setting connection max lifetime to %v (refresh interval)\n", tpccConfig.ConnRefreshInterval)
}

w, err = tpcc.NewWorkloader(globalDB, &tpccConfig)
}

Expand Down Expand Up @@ -116,6 +123,7 @@ func registerTpcc(root *cobra.Command) {
cmdRun.PersistentFlags().BoolVar(&tpccConfig.Wait, "wait", false, "including keying & thinking time described on TPC-C Standard Specification")
cmdRun.PersistentFlags().DurationVar(&tpccConfig.MaxMeasureLatency, "max-measure-latency", measurement.DefaultMaxLatency, "max measure latency in millisecond")
cmdRun.PersistentFlags().IntSliceVar(&tpccConfig.Weight, "weight", []int{45, 43, 4, 4, 4}, "Weight for NewOrder, Payment, OrderStatus, Delivery, StockLevel")
cmdRun.Flags().DurationVar(&tpccConfig.ConnRefreshInterval, "conn-refresh-interval", 0, "automatically refresh database connections at specified intervals to balance traffic across new replicas (0 = disabled, e.g., 10s)")

var cmdCleanup = &cobra.Command{
Use: "cleanup",
Expand Down
49 changes: 41 additions & 8 deletions tpcc/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type tpccState struct {
deliveryStmts map[string]*sql.Stmt
stockLevelStmt map[string]*sql.Stmt
paymentStmts map[string]*sql.Stmt

// for automatic connection refresh
lastConnRefresh time.Time
}

const (
Expand Down Expand Up @@ -84,6 +87,9 @@ type Config struct {

// output style
OutputStyle string

// automatic connection refresh interval to balance traffic across new replicas
ConnRefreshInterval time.Duration
}

// Workloader is TPCC workload
Expand Down Expand Up @@ -168,9 +174,10 @@ func (w *Workloader) Name() string {
// InitThread implements Workloader interface
func (w *Workloader) InitThread(ctx context.Context, threadID int) context.Context {
s := &tpccState{
TpcState: workload.NewTpcState(ctx, w.db),
index: 0,
decks: make([]int, 0, 23),
TpcState: workload.NewTpcState(ctx, w.db),
index: 0,
decks: make([]int, 0, 23),
lastConnRefresh: time.Now(),
}

for index, txn := range w.txns {
Expand Down Expand Up @@ -224,13 +231,39 @@ func getTPCCState(ctx context.Context) *tpccState {
}

// Run implements Workloader interface
func (w *Workloader) Run(ctx context.Context, threadID int) error {
func (w *Workloader) Run(ctx context.Context, threadID int) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic in TPC-C Run (thread %d): %v", threadID, r)
}
}()

s := getTPCCState(ctx)
refreshConn := false
if err := s.Conn.PingContext(ctx); err != nil {
if err := s.RefreshConn(ctx); err != nil {
return err

// Helper function to safely refresh connection with panic recovery
safeRefreshConn := func() error {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic during connection refresh (thread %d): %v", threadID, r)
}
}()
return s.RefreshConn(ctx)
}

// Check if automatic connection refresh is needed
if w.cfg.ConnRefreshInterval > 0 && time.Since(s.lastConnRefresh) >= w.cfg.ConnRefreshInterval {
if err := safeRefreshConn(); err != nil {
return fmt.Errorf("automatic connection refresh failed (thread %d): %w", threadID, err)
}
s.lastConnRefresh = time.Now()
refreshConn = true
} else if err := s.Conn.PingContext(ctx); err != nil {
// Fallback to ping-based refresh if automatic refresh didn't happen
if err := safeRefreshConn(); err != nil {
return fmt.Errorf("ping-based connection refresh failed (thread %d): %w", threadID, err)
}
s.lastConnRefresh = time.Now()
refreshConn = true
}
if s.newOrderStmts == nil || refreshConn {
Expand Down Expand Up @@ -308,7 +341,7 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
}

start := time.Now()
err := txn.action(ctx, threadID)
err = txn.action(ctx, threadID)

w.rtMeasurement.Measure(txn.name, time.Now().Sub(start), err)

Expand Down