diff --git a/src/collector/collector.go b/src/collector/collector.go index 8f1fe25721abb297864e039ee9fc5f6c6a734182..a41456152b8a751efa1b3f55d14ea4814ee2b476 100644 --- a/src/collector/collector.go +++ b/src/collector/collector.go @@ -20,6 +20,13 @@ var ( Help: "Number of backend connections for a pool.", }, []string{"pool_id"}) + // 某个连接池的活跃后端连接数目 + AcitveBackendConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "zqpool", + Name: "active_backend_connections", + Help: "Number of active backend connections for a pool.", + }, []string{"pool_id"}) + // 某个连接池的前端连接数目 FrontendConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "zqpool", @@ -69,6 +76,10 @@ func UpdateBackendConnections(labelValue string, value float64) { BackendConnections.WithLabelValues(labelValue).Set(value) } +func UpdateAcitveBackendConnections(labelValue string, value float64) { + AcitveBackendConnections.WithLabelValues(labelValue).Set(value) +} + func FrontendConnectionsIncrease(labelValue string) { FrontendConnections.With(prometheus.Labels{"pool_id": labelValue}).Inc() } diff --git a/src/main.go b/src/main.go index af8cca5a066f1c2c7dee530ffd72a9c6a2ee5615..bf9bfc88cac12c3eba7baad00cbd86909aebdd41 100644 --- a/src/main.go +++ b/src/main.go @@ -12,10 +12,10 @@ import ( "log" "time" + collector "csudata.com/zqpool/src/collector" config "csudata.com/zqpool/src/config" mgrhttp "csudata.com/zqpool/src/mgrhttp" poolserver "csudata.com/zqpool/src/poolserver" - collector "csudata.com/zqpool/src/collector" ) func main() { @@ -57,17 +57,6 @@ func main() { go collector.StartPromServer() - ticker := time.NewTicker(10 * time.Millisecond) - - go func() { - for { - select { - case <-ticker.C: - poolserver.CollectBackendConnections() - } - } - }() - go mgrhttp.StartHttpServer() var listenIp string diff --git a/src/poolserver/extended_query.go b/src/poolserver/extended_query.go index 816dc0812961a6e6f0732af9a0d8ee4d97904aca..398901961d2d174db0a299218a28cb4e5259a5de 100644 --- a/src/poolserver/extended_query.go +++ b/src/poolserver/extended_query.go @@ -225,11 +225,11 @@ func (ctx *ConnContext) reSendPrepareData(needPrepareMap map[string]*PrepareIdDa var isSuccess bool for prepareName, prepareIdData := range needPrepareMap { - zap.S().Infof("Client(%d): SB(%d): reprepare %s", ctx.Pid, ctx.pBackConn.Id, prepareName) if prepareName == "" { /*这是未命名的prepare*/ beginPos = 0 PrepareRequest = prepareIdData.PrepareRequest } else { + zap.S().Infof("Client(%d): SB(%d): reprepare %s", ctx.Pid, ctx.pBackConn.Id, prepareName) backendPrepareName = fmt.Sprintf("S%d", prepareIdData.PrepareId) //backendPrepareName = fmt.Sprintf("S%d", pib.PrepareId) newPrepareNameLen = int32(len([]byte(backendPrepareName))) @@ -252,7 +252,7 @@ func (ctx *ConnContext) reSendPrepareData(needPrepareMap map[string]*PrepareIdDa zap.S().Infof("Client(%d): reconnect backend(%d)", ctx.Pid, ctx.pBackConn.Id) ctx.BeReconnect() if ctx.isGetBackend { - zap.S().Infof("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -267,7 +267,7 @@ func (ctx *ConnContext) reSendPrepareData(needPrepareMap map[string]*PrepareIdDa zap.S().Infof("Client(%d): reconnect backend(%d)", ctx.Pid, ctx.pBackConn.Id) ctx.BeReconnect() if ctx.isGetBackend { - zap.S().Infof("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -283,6 +283,7 @@ func (ctx *ConnContext) reSendPrepareData(needPrepareMap map[string]*PrepareIdDa ctx.cliConn.Close() ctx.BeReconnect() if ctx.isGetBackend { + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -340,6 +341,7 @@ func (ctx *ConnContext) ProcessExtendedQuery() error { /* 处理Extended Query /* 现获得一个backend */ if !ctx.isGetBackend { ctx.pBackConn = getBackend(ctx.pool) + zap.S().Debugf("Client(%d): get backend(%d)", ctx.Pid, ctx.pBackConn.Id) ctx.isGetBackend = true //TCHDEBUG zap.S().Infof("Client(%d): hold backend(%d)", ctx.Pid, ctx.pBackConn.Id) } @@ -478,7 +480,7 @@ func (ctx *ConnContext) handlePrepareNameNotExists(prepareName string) error { CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { - zap.S().Infof("Client(%d, B): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -508,7 +510,7 @@ func (ctx *ConnContext) handlePrepareNameNotExists(prepareName string) error { CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { - zap.S().Infof("Client(%d, B): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -522,7 +524,7 @@ func (ctx *ConnContext) handlePrepareNameNotExists(prepareName string) error { CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { - zap.S().Infof("Client(%d, B): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } diff --git a/src/poolserver/fe_be_logic.go b/src/poolserver/fe_be_logic.go index b632a4c38680bb3112ea62f08fa22b37887e543d..694988c17321d2b24ad42a7ac9b5d86421aebc39 100644 --- a/src/poolserver/fe_be_logic.go +++ b/src/poolserver/fe_be_logic.go @@ -6,8 +6,8 @@ import ( "sync/atomic" "time" - "go.uber.org/zap" collector "csudata.com/zqpool/src/collector" + "go.uber.org/zap" ) /*获得第一个消息包*/ @@ -139,6 +139,7 @@ func (ctx *ConnContext) ProcessX() { /* Terminate消息*/ CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) } ctx.cliConn.Close() @@ -153,6 +154,7 @@ func (ctx *ConnContext) ProcessQ() error { /* 这是 Query 简单查询 */ if !ctx.isGetBackend { //zap.S().Infof("Get connect from pool.") ctx.pBackConn = getBackend(ctx.pool) + zap.S().Debugf("Client(%d): get backend(%d)", ctx.Pid, ctx.pBackConn.Id) ctx.isGetBackend = true //TCHDEBUG zap.S().Infof("Client(%d, Q): hold backend(%d)", ctx.Pid, ctx.pBackConn.Id) //reConnCnt = ctx.pBackConn.ReConnCnt @@ -322,7 +324,7 @@ success: /*把临时放prepare data的map清空*/ ctx.cupreMap = make(map[string]*CuPre) if ctx.transState == 'I' { /*释放后端连接*/ - //TCHDEBUG zap.S().Infof("Client(%d, S): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false ctx.pBackConn = nil @@ -342,7 +344,7 @@ func (ctx *ConnContext) sendToBackend() error { zap.S().Infof("Client(%d): reconnect backend(%d)", ctx.Pid, ctx.pBackConn.Id) ctx.BeReconnect() if ctx.isGetBackend { - zap.S().Infof("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -368,6 +370,7 @@ func (ctx *ConnContext) sendToBackendWithRetry() error { zap.S().Infof("Client(%d): SB(%d): reconnect backend, close client", ctx.Pid, ctx.pBackConn.Id) ctx.cliConn.Close() ctx.BeReconnect() + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) return err } @@ -383,6 +386,7 @@ func (ctx *ConnContext) handleError() { CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -409,6 +413,7 @@ func (ctx *ConnContext) ProcessC() error { /* 处理close消息 */ CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -423,6 +428,7 @@ func (ctx *ConnContext) ProcessC() error { /* 处理close消息 */ CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -452,6 +458,7 @@ func (ctx *ConnContext) ProcessC() error { /* 处理close消息 */ CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -469,6 +476,7 @@ func (ctx *ConnContext) ProcessC() error { /* 处理close消息 */ CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -484,7 +492,7 @@ func (ctx *ConnContext) ProcessC() error { /* 处理close消息 */ CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { - zap.S().Infof("Client(%d, B): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -522,6 +530,7 @@ func handleConnection(cliConn net.Conn) { CleanupTrans(ctx.pBackConn.Conn) } if ctx.isGetBackend { + zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id) atomic.StoreInt32(&ctx.pBackConn.State, 0) ctx.isGetBackend = false } @@ -560,8 +569,8 @@ func handleConnection(cliConn net.Conn) { } continue } else { - zap.S().Panicf("unknown message(%c)", ctx.msgList[0][0]) collector.ActiveFrontendConnectionsDecrease(pool.Conf.ID) + zap.S().Panicf("unknown message(%c)", ctx.msgList[0][0]) } //zap.S().Panicf("Client(%d): Unknown type(%c): %v", ctx.Pid, ctx.recvBuf[0], ctx.recvBuf[:ctx.recvLen]) } diff --git a/src/poolserver/poolserver.go b/src/poolserver/poolserver.go index 55e367471789ec485c3636af140d1531c3173c76..289996aeccf8d3ed9c32ce24ca07031df781eed4 100644 --- a/src/poolserver/poolserver.go +++ b/src/poolserver/poolserver.go @@ -459,6 +459,22 @@ func CollectBackendConnections() { } } +func CollectActiveBackendConnections() { + var pool *Pool + var backendConn *PgBackendConn + var activeBackendConnNum float64 = 0 + for _, pool = range g_backend_pool { + for _, backendConn = range pool.BeConns { + stateValue := atomic.LoadInt32(&backendConn.State) + if stateValue == 1 { + activeBackendConnNum += 1 + } + } + collector.UpdateAcitveBackendConnections(pool.Conf.ID, activeBackendConnNum) + activeBackendConnNum = 0 + } +} + func PoolReleaseBeDb(poolName string, portal string) error { var pool *Pool var backConn *PgBackendConn @@ -537,6 +553,18 @@ func StartServer(listenAddr string) { return } + ticker := time.NewTicker(10 * time.Millisecond) + + go func() { + for { + select { + case <-ticker.C: + CollectBackendConnections() + CollectActiveBackendConnections() + } + } + }() + for { con, err := listener.Accept() if err != nil {