forked from yuxh/gearman-go
concurrent map bug fixed
This commit is contained in:
parent
99c8032384
commit
dd82e211a3
@ -19,8 +19,8 @@ type Client struct {
|
|||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
net, addr, lastcall string
|
net, addr, lastcall string
|
||||||
respHandler map[string]ResponseHandler
|
respHandler *responseHandlerMap
|
||||||
innerHandler map[string]ResponseHandler
|
innerHandler *responseHandlerMap
|
||||||
in chan *Response
|
in chan *Response
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
rw *bufio.ReadWriter
|
rw *bufio.ReadWriter
|
||||||
@ -30,13 +30,40 @@ type Client struct {
|
|||||||
ErrorHandler ErrorHandler
|
ErrorHandler ErrorHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type responseHandlerMap struct {
|
||||||
|
sync.RWMutex
|
||||||
|
holder map[string]ResponseHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
func newResponseHandlerMap() *responseHandlerMap {
|
||||||
|
return &responseHandlerMap{holder: make(map[string]ResponseHandler, queueSize)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *responseHandlerMap) remove(key string) {
|
||||||
|
r.Lock()
|
||||||
|
delete(r.holder, key)
|
||||||
|
r.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *responseHandlerMap) get(key string) (ResponseHandler, bool) {
|
||||||
|
r.RLock()
|
||||||
|
rh, b := r.holder[key]
|
||||||
|
r.RUnlock()
|
||||||
|
return rh, b
|
||||||
|
}
|
||||||
|
func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
|
||||||
|
r.Lock()
|
||||||
|
r.holder[key] = rh
|
||||||
|
r.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// Return a client.
|
// Return a client.
|
||||||
func New(network, addr string) (client *Client, err error) {
|
func New(network, addr string) (client *Client, err error) {
|
||||||
client = &Client{
|
client = &Client{
|
||||||
net: network,
|
net: network,
|
||||||
addr: addr,
|
addr: addr,
|
||||||
respHandler: make(map[string]ResponseHandler, queueSize),
|
respHandler: newResponseHandlerMap(),
|
||||||
innerHandler: make(map[string]ResponseHandler, queueSize),
|
innerHandler: newResponseHandlerMap(),
|
||||||
in: make(chan *Response, queueSize),
|
in: make(chan *Response, queueSize),
|
||||||
ResponseTimeout: DefaultTimeout,
|
ResponseTimeout: DefaultTimeout,
|
||||||
}
|
}
|
||||||
@ -155,7 +182,7 @@ func (client *Client) processLoop() {
|
|||||||
resp = client.handleResponse(resp.Handle, resp)
|
resp = client.handleResponse(resp.Handle, resp)
|
||||||
case dtWorkComplete, dtWorkFail, dtWorkException:
|
case dtWorkComplete, dtWorkFail, dtWorkException:
|
||||||
client.handleResponse(resp.Handle, resp)
|
client.handleResponse(resp.Handle, resp)
|
||||||
delete(client.respHandler, resp.Handle)
|
client.respHandler.remove(resp.Handle)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -167,7 +194,7 @@ func (client *Client) err(e error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (client *Client) handleResponse(key string, resp *Response) *Response {
|
func (client *Client) handleResponse(key string, resp *Response) *Response {
|
||||||
if h, ok := client.respHandler[key]; ok {
|
if h, ok := client.respHandler.get(key); ok {
|
||||||
h(resp)
|
h(resp)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -175,9 +202,9 @@ func (client *Client) handleResponse(key string, resp *Response) *Response {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (client *Client) handleInner(key string, resp *Response) *Response {
|
func (client *Client) handleInner(key string, resp *Response) *Response {
|
||||||
if h, ok := client.innerHandler[key]; ok {
|
if h, ok := client.innerHandler.get(key); ok {
|
||||||
h(resp)
|
h(resp)
|
||||||
delete(client.innerHandler, key)
|
client.innerHandler.remove(key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return resp
|
return resp
|
||||||
@ -195,7 +222,7 @@ func (client *Client) do(funcname string, data []byte,
|
|||||||
}
|
}
|
||||||
var result = make(chan handleOrError, 1)
|
var result = make(chan handleOrError, 1)
|
||||||
client.lastcall = "c"
|
client.lastcall = "c"
|
||||||
client.innerHandler["c"] = func(resp *Response) {
|
client.innerHandler.put("c", func(resp *Response) {
|
||||||
if resp.DataType == dtError {
|
if resp.DataType == dtError {
|
||||||
err = getError(resp.Data)
|
err = getError(resp.Data)
|
||||||
result <- handleOrError{"", err}
|
result <- handleOrError{"", err}
|
||||||
@ -203,12 +230,12 @@ func (client *Client) do(funcname string, data []byte,
|
|||||||
}
|
}
|
||||||
handle = resp.Handle
|
handle = resp.Handle
|
||||||
result <- handleOrError{handle, nil}
|
result <- handleOrError{handle, nil}
|
||||||
}
|
})
|
||||||
id := IdGen.Id()
|
id := IdGen.Id()
|
||||||
req := getJob(id, []byte(funcname), data)
|
req := getJob(id, []byte(funcname), data)
|
||||||
req.DataType = flag
|
req.DataType = flag
|
||||||
if err = client.write(req); err != nil {
|
if err = client.write(req); err != nil {
|
||||||
delete(client.innerHandler, "c")
|
client.innerHandler.remove("c")
|
||||||
client.lastcall = ""
|
client.lastcall = ""
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -217,7 +244,7 @@ func (client *Client) do(funcname string, data []byte,
|
|||||||
case ret := <-result:
|
case ret := <-result:
|
||||||
return ret.handle, ret.err
|
return ret.handle, ret.err
|
||||||
case <-timer:
|
case <-timer:
|
||||||
delete(client.innerHandler, "c")
|
client.innerHandler.remove("c")
|
||||||
client.lastcall = ""
|
client.lastcall = ""
|
||||||
return "", ErrLostConn
|
return "", ErrLostConn
|
||||||
}
|
}
|
||||||
@ -239,7 +266,7 @@ func (client *Client) Do(funcname string, data []byte,
|
|||||||
}
|
}
|
||||||
handle, err = client.do(funcname, data, datatype)
|
handle, err = client.do(funcname, data, datatype)
|
||||||
if err == nil && h != nil {
|
if err == nil && h != nil {
|
||||||
client.respHandler[handle] = h
|
client.respHandler.put(handle, h)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -272,14 +299,14 @@ func (client *Client) Status(handle string) (status *Status, err error) {
|
|||||||
var mutex sync.Mutex
|
var mutex sync.Mutex
|
||||||
mutex.Lock()
|
mutex.Lock()
|
||||||
client.lastcall = "s" + handle
|
client.lastcall = "s" + handle
|
||||||
client.innerHandler["s"+handle] = func(resp *Response) {
|
client.innerHandler.put("s"+handle, func(resp *Response) {
|
||||||
defer mutex.Unlock()
|
defer mutex.Unlock()
|
||||||
var err error
|
var err error
|
||||||
status, err = resp._status()
|
status, err = resp._status()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
client.err(err)
|
client.err(err)
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
req := getRequest()
|
req := getRequest()
|
||||||
req.DataType = dtGetStatus
|
req.DataType = dtGetStatus
|
||||||
req.Data = []byte(handle)
|
req.Data = []byte(handle)
|
||||||
@ -295,10 +322,10 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) {
|
|||||||
}
|
}
|
||||||
var mutex sync.Mutex
|
var mutex sync.Mutex
|
||||||
mutex.Lock()
|
mutex.Lock()
|
||||||
client.innerHandler["e"] = func(resp *Response) {
|
client.innerHandler.put("e", func(resp *Response) {
|
||||||
echo = resp.Data
|
echo = resp.Data
|
||||||
mutex.Unlock()
|
mutex.Unlock()
|
||||||
}
|
})
|
||||||
req := getRequest()
|
req := getRequest()
|
||||||
req.DataType = dtEchoReq
|
req.DataType = dtEchoReq
|
||||||
req.Data = data
|
req.Data = data
|
||||||
|
Loading…
Reference in New Issue
Block a user