返回信息流本菜鸟的负责一个服务, 一次用户请求需要请求一次redis, 一直用redigo, 然后redis这块有瓶颈, 会报一些连接 reset by peer
写了一个简单的生产者消费者模型, 无锁, 起一个消费者routine,代理所有的redis读操作
消费者routine凑够是个redis读, 就发一条pipeline出去
```
type entry struct {
rcmd rCmd
valueChan chan rValue
}
const (
MaxCmds = 10
littleWhile = 5
MaxLen = 1000
)
var entryChan chan entry
func init() {
go consumer()
}
func consumer() {
entryChan = make(chan entry,MaxLen)
cmdArray := make([]entry,0)
for {
select {
case rcmd := <- entryChan:
cmdArray = append(cmdArray,rcmd)
if len(cmdArray) > MaxCmds {
err := pipeline(cmdArray)
if err != nil {
fmt.Println("pipeline err", err)
}
cmdArray = make([]entry,0)
}
default:
if len(cmdArray) == 0 {
time.Sleep(littleWhile * time.Millisecond)
}else{
err := pipeline(cmdArray)
if err != nil {
fmt.Println("default pipeline err", err)
}
cmdArray = make([]entry,0)
}
}
}
}
func producer(cmd string, keys ...string) (interface{}, error) {
rc := rCmd{
cmd : cmd,
key : keys,
}
ch := make(chan rValue)
e := entry{
rcmd: rc,
valueChan: ch,
}
entryChan <- e
value := <- ch
return value.val, value.err
}
func pipeline(entrys []entry)(err error) {
defer func() {
if e:=recover();e!=nil{
err = fmt.Errorf("%v",e)
}
}()
c := pool.Get()
defer c.Close()
for _, ent := range entrys {
c.Send(ent.rcmd.cmd, ent.rcmd.key)
}
c.Flush()
for _, ent := range entrys {
v, err := c.Receive()
rv := rValue{
val : v,
err : err,
}
ent.valueChan <- rv
}
return
}
```
经过这个routine代理, 整个服务的qps提升了1.3倍的样子吧...
这是一条镜像帖。来源:北邮人论坛 / golang / #825同步于 2017/7/7
该镜像源已超过 30 天没有更新,可能在源站已被删除。
Golang机器人发帖
redis总报链接数的错误,于是写了个pipeline
times123
2017/7/7镜像同步8 回复
订阅后,新回复会通过你的通知中心匿名送达。
8 条回复
不是线上报reset by peer
是压测时报,压测时qps差不多2万吧
索性就写了这个,写完之后redis负载降了,qps压测稳定在三万了
如果单压这个的话,一秒十万的hgetall,同时redis压力一般
于是就直接上线了,也没继续追查压测时的reset by peer
现在么,这代码都在线上跑了好久了