From dd82e211a31f9ab5fb110405b9f1e3e290f5562a Mon Sep 17 00:00:00 2001 From: JessonChan Date: Fri, 6 May 2016 18:00:58 +0800 Subject: [PATCH] concurrent map bug fixed --- client/client.go | 61 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/client/client.go b/client/client.go index 907704f..368363c 100644 --- a/client/client.go +++ b/client/client.go @@ -19,8 +19,8 @@ type Client struct { sync.Mutex net, addr, lastcall string - respHandler map[string]ResponseHandler - innerHandler map[string]ResponseHandler + respHandler *responseHandlerMap + innerHandler *responseHandlerMap in chan *Response conn net.Conn rw *bufio.ReadWriter @@ -30,13 +30,40 @@ type Client struct { ErrorHandler ErrorHandler } +type responseHandlerMap struct { + sync.RWMutex + holder map[string]ResponseHandler +} + +func newResponseHandlerMap() *responseHandlerMap { + return &responseHandlerMap{holder: make(map[string]ResponseHandler, queueSize)} +} + +func (r *responseHandlerMap) remove(key string) { + r.Lock() + delete(r.holder, key) + r.Unlock() +} + +func (r *responseHandlerMap) get(key string) (ResponseHandler, bool) { + r.RLock() + rh, b := r.holder[key] + r.RUnlock() + return rh, b +} +func (r *responseHandlerMap) put(key string, rh ResponseHandler) { + r.Lock() + r.holder[key] = rh + r.Unlock() +} + // Return a client. func New(network, addr string) (client *Client, err error) { client = &Client{ net: network, addr: addr, - respHandler: make(map[string]ResponseHandler, queueSize), - innerHandler: make(map[string]ResponseHandler, queueSize), + respHandler: newResponseHandlerMap(), + innerHandler: newResponseHandlerMap(), in: make(chan *Response, queueSize), ResponseTimeout: DefaultTimeout, } @@ -155,7 +182,7 @@ func (client *Client) processLoop() { resp = client.handleResponse(resp.Handle, resp) case dtWorkComplete, dtWorkFail, dtWorkException: client.handleResponse(resp.Handle, resp) - delete(client.respHandler, resp.Handle) + client.respHandler.remove(resp.Handle) } } } @@ -167,7 +194,7 @@ func (client *Client) err(e error) { } func (client *Client) handleResponse(key string, resp *Response) *Response { - if h, ok := client.respHandler[key]; ok { + if h, ok := client.respHandler.get(key); ok { h(resp) return nil } @@ -175,9 +202,9 @@ func (client *Client) handleResponse(key string, resp *Response) *Response { } func (client *Client) handleInner(key string, resp *Response) *Response { - if h, ok := client.innerHandler[key]; ok { + if h, ok := client.innerHandler.get(key); ok { h(resp) - delete(client.innerHandler, key) + client.innerHandler.remove(key) return nil } return resp @@ -195,7 +222,7 @@ func (client *Client) do(funcname string, data []byte, } var result = make(chan handleOrError, 1) client.lastcall = "c" - client.innerHandler["c"] = func(resp *Response) { + client.innerHandler.put("c", func(resp *Response) { if resp.DataType == dtError { err = getError(resp.Data) result <- handleOrError{"", err} @@ -203,12 +230,12 @@ func (client *Client) do(funcname string, data []byte, } handle = resp.Handle result <- handleOrError{handle, nil} - } + }) id := IdGen.Id() req := getJob(id, []byte(funcname), data) req.DataType = flag if err = client.write(req); err != nil { - delete(client.innerHandler, "c") + client.innerHandler.remove("c") client.lastcall = "" return } @@ -217,7 +244,7 @@ func (client *Client) do(funcname string, data []byte, case ret := <-result: return ret.handle, ret.err case <-timer: - delete(client.innerHandler, "c") + client.innerHandler.remove("c") client.lastcall = "" return "", ErrLostConn } @@ -239,7 +266,7 @@ func (client *Client) Do(funcname string, data []byte, } handle, err = client.do(funcname, data, datatype) if err == nil && h != nil { - client.respHandler[handle] = h + client.respHandler.put(handle, h) } return } @@ -272,14 +299,14 @@ func (client *Client) Status(handle string) (status *Status, err error) { var mutex sync.Mutex mutex.Lock() client.lastcall = "s" + handle - client.innerHandler["s"+handle] = func(resp *Response) { + client.innerHandler.put("s"+handle, func(resp *Response) { defer mutex.Unlock() var err error status, err = resp._status() if err != nil { client.err(err) } - } + }) req := getRequest() req.DataType = dtGetStatus req.Data = []byte(handle) @@ -295,10 +322,10 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) { } var mutex sync.Mutex mutex.Lock() - client.innerHandler["e"] = func(resp *Response) { + client.innerHandler.put("e", func(resp *Response) { echo = resp.Data mutex.Unlock() - } + }) req := getRequest() req.DataType = dtEchoReq req.Data = data