forked from yuxh/gearman-go
Merge pull request #70 from JessonChan/master
concurrent map bug fixed #70
This commit is contained in:
commit
b79fee2965
@ -19,8 +19,8 @@ type Client struct {
|
||||
sync.Mutex
|
||||
|
||||
net, addr, lastcall string
|
||||
respHandler map[string]ResponseHandler
|
||||
innerHandler map[string]ResponseHandler
|
||||
respHandler *responseHandlerMap
|
||||
innerHandler *responseHandlerMap
|
||||
in chan *Response
|
||||
conn net.Conn
|
||||
rw *bufio.ReadWriter
|
||||
@ -30,13 +30,40 @@ type Client struct {
|
||||
ErrorHandler ErrorHandler
|
||||
}
|
||||
|
||||
type responseHandlerMap struct {
|
||||
sync.RWMutex
|
||||
holder map[string]ResponseHandler
|
||||
}
|
||||
|
||||
func newResponseHandlerMap() *responseHandlerMap {
|
||||
return &responseHandlerMap{holder: make(map[string]ResponseHandler, queueSize)}
|
||||
}
|
||||
|
||||
func (r *responseHandlerMap) remove(key string) {
|
||||
r.Lock()
|
||||
delete(r.holder, key)
|
||||
r.Unlock()
|
||||
}
|
||||
|
||||
func (r *responseHandlerMap) get(key string) (ResponseHandler, bool) {
|
||||
r.RLock()
|
||||
rh, b := r.holder[key]
|
||||
r.RUnlock()
|
||||
return rh, b
|
||||
}
|
||||
func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
|
||||
r.Lock()
|
||||
r.holder[key] = rh
|
||||
r.Unlock()
|
||||
}
|
||||
|
||||
// Return a client.
|
||||
func New(network, addr string) (client *Client, err error) {
|
||||
client = &Client{
|
||||
net: network,
|
||||
addr: addr,
|
||||
respHandler: make(map[string]ResponseHandler, queueSize),
|
||||
innerHandler: make(map[string]ResponseHandler, queueSize),
|
||||
respHandler: newResponseHandlerMap(),
|
||||
innerHandler: newResponseHandlerMap(),
|
||||
in: make(chan *Response, queueSize),
|
||||
ResponseTimeout: DefaultTimeout,
|
||||
}
|
||||
@ -155,7 +182,7 @@ func (client *Client) processLoop() {
|
||||
resp = client.handleResponse(resp.Handle, resp)
|
||||
case dtWorkComplete, dtWorkFail, dtWorkException:
|
||||
client.handleResponse(resp.Handle, resp)
|
||||
delete(client.respHandler, resp.Handle)
|
||||
client.respHandler.remove(resp.Handle)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -167,7 +194,7 @@ func (client *Client) err(e error) {
|
||||
}
|
||||
|
||||
func (client *Client) handleResponse(key string, resp *Response) *Response {
|
||||
if h, ok := client.respHandler[key]; ok {
|
||||
if h, ok := client.respHandler.get(key); ok {
|
||||
h(resp)
|
||||
return nil
|
||||
}
|
||||
@ -175,9 +202,9 @@ func (client *Client) handleResponse(key string, resp *Response) *Response {
|
||||
}
|
||||
|
||||
func (client *Client) handleInner(key string, resp *Response) *Response {
|
||||
if h, ok := client.innerHandler[key]; ok {
|
||||
if h, ok := client.innerHandler.get(key); ok {
|
||||
h(resp)
|
||||
delete(client.innerHandler, key)
|
||||
client.innerHandler.remove(key)
|
||||
return nil
|
||||
}
|
||||
return resp
|
||||
@ -195,7 +222,7 @@ func (client *Client) do(funcname string, data []byte,
|
||||
}
|
||||
var result = make(chan handleOrError, 1)
|
||||
client.lastcall = "c"
|
||||
client.innerHandler["c"] = func(resp *Response) {
|
||||
client.innerHandler.put("c", func(resp *Response) {
|
||||
if resp.DataType == dtError {
|
||||
err = getError(resp.Data)
|
||||
result <- handleOrError{"", err}
|
||||
@ -203,12 +230,12 @@ func (client *Client) do(funcname string, data []byte,
|
||||
}
|
||||
handle = resp.Handle
|
||||
result <- handleOrError{handle, nil}
|
||||
}
|
||||
})
|
||||
id := IdGen.Id()
|
||||
req := getJob(id, []byte(funcname), data)
|
||||
req.DataType = flag
|
||||
if err = client.write(req); err != nil {
|
||||
delete(client.innerHandler, "c")
|
||||
client.innerHandler.remove("c")
|
||||
client.lastcall = ""
|
||||
return
|
||||
}
|
||||
@ -217,7 +244,7 @@ func (client *Client) do(funcname string, data []byte,
|
||||
case ret := <-result:
|
||||
return ret.handle, ret.err
|
||||
case <-timer:
|
||||
delete(client.innerHandler, "c")
|
||||
client.innerHandler.remove("c")
|
||||
client.lastcall = ""
|
||||
return "", ErrLostConn
|
||||
}
|
||||
@ -239,7 +266,7 @@ func (client *Client) Do(funcname string, data []byte,
|
||||
}
|
||||
handle, err = client.do(funcname, data, datatype)
|
||||
if err == nil && h != nil {
|
||||
client.respHandler[handle] = h
|
||||
client.respHandler.put(handle, h)
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -272,14 +299,14 @@ func (client *Client) Status(handle string) (status *Status, err error) {
|
||||
var mutex sync.Mutex
|
||||
mutex.Lock()
|
||||
client.lastcall = "s" + handle
|
||||
client.innerHandler["s"+handle] = func(resp *Response) {
|
||||
client.innerHandler.put("s"+handle, func(resp *Response) {
|
||||
defer mutex.Unlock()
|
||||
var err error
|
||||
status, err = resp._status()
|
||||
if err != nil {
|
||||
client.err(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
req := getRequest()
|
||||
req.DataType = dtGetStatus
|
||||
req.Data = []byte(handle)
|
||||
@ -295,10 +322,10 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) {
|
||||
}
|
||||
var mutex sync.Mutex
|
||||
mutex.Lock()
|
||||
client.innerHandler["e"] = func(resp *Response) {
|
||||
client.innerHandler.put("e", func(resp *Response) {
|
||||
echo = resp.Data
|
||||
mutex.Unlock()
|
||||
}
|
||||
})
|
||||
req := getRequest()
|
||||
req.DataType = dtEchoReq
|
||||
req.Data = data
|
||||
|
Loading…
Reference in New Issue
Block a user