forked from yuxh/gearman-go
		
	Replace mutex in client.do() with a channel to avoid deadlock and introduce command timeout
This commit is contained in:
		
							parent
							
								
									df1af4f8cb
								
							
						
					
					
						commit
						c615e74af8
					
				@ -6,6 +6,7 @@ import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"net"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// One client connect to one server.
 | 
			
		||||
@ -20,6 +21,8 @@ type Client struct {
 | 
			
		||||
	conn                net.Conn
 | 
			
		||||
	rw                  *bufio.ReadWriter
 | 
			
		||||
 | 
			
		||||
	RespTimeout         time.Duration  // response timeout for do() in ms
 | 
			
		||||
 | 
			
		||||
	ErrorHandler ErrorHandler
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -31,6 +34,7 @@ func New(network, addr string) (client *Client, err error) {
 | 
			
		||||
		respHandler:  make(map[string]ResponseHandler, queueSize),
 | 
			
		||||
		innerHandler: make(map[string]ResponseHandler, queueSize),
 | 
			
		||||
		in:           make(chan *Response, queueSize),
 | 
			
		||||
		RespTimeout:  1000,
 | 
			
		||||
	}
 | 
			
		||||
	client.conn, err = net.Dial(client.net, client.addr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@ -175,21 +179,26 @@ func (client *Client) handleInner(key string, resp *Response) *Response {
 | 
			
		||||
	return resp
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type handleOrError struct {
 | 
			
		||||
	handle string
 | 
			
		||||
	err error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (client *Client) do(funcname string, data []byte,
 | 
			
		||||
	flag uint32) (handle string, err error) {
 | 
			
		||||
	if client.conn == nil {
 | 
			
		||||
		return "", ErrLostConn
 | 
			
		||||
	}
 | 
			
		||||
	var mutex sync.Mutex
 | 
			
		||||
	mutex.Lock()
 | 
			
		||||
	var result = make(chan handleOrError, 1)
 | 
			
		||||
	client.lastcall = "c"
 | 
			
		||||
	client.innerHandler["c"] = func(resp *Response) {
 | 
			
		||||
		defer mutex.Unlock()
 | 
			
		||||
		if resp.DataType == dtError {
 | 
			
		||||
			err = getError(resp.Data)
 | 
			
		||||
			result <- handleOrError{"", err}
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		handle = resp.Handle
 | 
			
		||||
		result <- handleOrError{handle, nil}
 | 
			
		||||
	}
 | 
			
		||||
	id := IdGen.Id()
 | 
			
		||||
	req := getJob(id, []byte(funcname), data)
 | 
			
		||||
@ -199,7 +208,15 @@ func (client *Client) do(funcname string, data []byte,
 | 
			
		||||
		client.lastcall = ""
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	mutex.Lock()
 | 
			
		||||
	var timer = time.After(client.RespTimeout * time.Millisecond)
 | 
			
		||||
	select {
 | 
			
		||||
	case ret := <-result:
 | 
			
		||||
		return ret.handle, ret.err
 | 
			
		||||
	case <-timer:
 | 
			
		||||
		delete(client.innerHandler, "c")
 | 
			
		||||
		client.lastcall = ""
 | 
			
		||||
		return "", ErrLostConn
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user