forked from yuxh/gearman-go
		
	Merge pull request #60 from micmac/master
Replace mutex in client.do() with a channel to avoid deadlock and int…
This commit is contained in:
		
						commit
						503d523dbf
					
				@ -6,6 +6,7 @@ import (
 | 
				
			|||||||
	"bufio"
 | 
						"bufio"
 | 
				
			||||||
	"net"
 | 
						"net"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// One client connect to one server.
 | 
					// One client connect to one server.
 | 
				
			||||||
@ -20,6 +21,8 @@ type Client struct {
 | 
				
			|||||||
	conn                net.Conn
 | 
						conn                net.Conn
 | 
				
			||||||
	rw                  *bufio.ReadWriter
 | 
						rw                  *bufio.ReadWriter
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						RespTimeout         time.Duration  // response timeout for do() in ms
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ErrorHandler ErrorHandler
 | 
						ErrorHandler ErrorHandler
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -31,6 +34,7 @@ func New(network, addr string) (client *Client, err error) {
 | 
				
			|||||||
		respHandler:  make(map[string]ResponseHandler, queueSize),
 | 
							respHandler:  make(map[string]ResponseHandler, queueSize),
 | 
				
			||||||
		innerHandler: make(map[string]ResponseHandler, queueSize),
 | 
							innerHandler: make(map[string]ResponseHandler, queueSize),
 | 
				
			||||||
		in:           make(chan *Response, queueSize),
 | 
							in:           make(chan *Response, queueSize),
 | 
				
			||||||
 | 
							RespTimeout:  1000,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	client.conn, err = net.Dial(client.net, client.addr)
 | 
						client.conn, err = net.Dial(client.net, client.addr)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@ -175,21 +179,26 @@ func (client *Client) handleInner(key string, resp *Response) *Response {
 | 
				
			|||||||
	return resp
 | 
						return resp
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type handleOrError struct {
 | 
				
			||||||
 | 
						handle string
 | 
				
			||||||
 | 
						err error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (client *Client) do(funcname string, data []byte,
 | 
					func (client *Client) do(funcname string, data []byte,
 | 
				
			||||||
	flag uint32) (handle string, err error) {
 | 
						flag uint32) (handle string, err error) {
 | 
				
			||||||
	if client.conn == nil {
 | 
						if client.conn == nil {
 | 
				
			||||||
		return "", ErrLostConn
 | 
							return "", ErrLostConn
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	var mutex sync.Mutex
 | 
						var result = make(chan handleOrError, 1)
 | 
				
			||||||
	mutex.Lock()
 | 
					 | 
				
			||||||
	client.lastcall = "c"
 | 
						client.lastcall = "c"
 | 
				
			||||||
	client.innerHandler["c"] = func(resp *Response) {
 | 
						client.innerHandler["c"] = func(resp *Response) {
 | 
				
			||||||
		defer mutex.Unlock()
 | 
					 | 
				
			||||||
		if resp.DataType == dtError {
 | 
							if resp.DataType == dtError {
 | 
				
			||||||
			err = getError(resp.Data)
 | 
								err = getError(resp.Data)
 | 
				
			||||||
 | 
								result <- handleOrError{"", err}
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		handle = resp.Handle
 | 
							handle = resp.Handle
 | 
				
			||||||
 | 
							result <- handleOrError{handle, nil}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	id := IdGen.Id()
 | 
						id := IdGen.Id()
 | 
				
			||||||
	req := getJob(id, []byte(funcname), data)
 | 
						req := getJob(id, []byte(funcname), data)
 | 
				
			||||||
@ -199,7 +208,15 @@ func (client *Client) do(funcname string, data []byte,
 | 
				
			|||||||
		client.lastcall = ""
 | 
							client.lastcall = ""
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	mutex.Lock()
 | 
						var timer = time.After(client.RespTimeout * time.Millisecond)
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case ret := <-result:
 | 
				
			||||||
 | 
							return ret.handle, ret.err
 | 
				
			||||||
 | 
						case <-timer:
 | 
				
			||||||
 | 
							delete(client.innerHandler, "c")
 | 
				
			||||||
 | 
							client.lastcall = ""
 | 
				
			||||||
 | 
							return "", ErrLostConn
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return
 | 
						return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user