forked from yuxh/gearman-go
		
	Refactoring the client api
This commit is contained in:
		
							parent
							
								
									dbc06bf540
								
							
						
					
					
						commit
						c00234ba9a
					
				
							
								
								
									
										109
									
								
								client/client.go
									
									
									
									
									
								
							
							
						
						
									
										109
									
								
								client/client.go
									
									
									
									
									
								
							@ -34,6 +34,13 @@ type Client struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    in chan []byte
 | 
					    in chan []byte
 | 
				
			||||||
    out chan *Job
 | 
					    out chan *Job
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    created chan string
 | 
				
			||||||
 | 
					    echo chan []byte
 | 
				
			||||||
 | 
					    status chan *Status
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    jobhandlers map[string]JobHandler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    conn net.Conn
 | 
					    conn net.Conn
 | 
				
			||||||
    addr string
 | 
					    addr string
 | 
				
			||||||
    ai *autoinc.AutoInc
 | 
					    ai *autoinc.AutoInc
 | 
				
			||||||
@ -46,7 +53,12 @@ type Client struct {
 | 
				
			|||||||
//      client, err := client.New("127.0.0.1:4730")
 | 
					//      client, err := client.New("127.0.0.1:4730")
 | 
				
			||||||
func New(addr string) (client *Client, err error) {
 | 
					func New(addr string) (client *Client, err error) {
 | 
				
			||||||
    client = &Client{
 | 
					    client = &Client{
 | 
				
			||||||
        jobCreated: make(chan *Job),
 | 
					        created: make(chan string, common.QUEUE_SIZE),
 | 
				
			||||||
 | 
					        echo: make(chan []byte, common.QUEUE_SIZE),
 | 
				
			||||||
 | 
					        status: make(chan *Status, common.QUEUE_SIZE),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        jobhandlers: make(map[string]JobHandler, common.QUEUE_SIZE),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        in: make(chan []byte, common.QUEUE_SIZE),
 | 
					        in: make(chan []byte, common.QUEUE_SIZE),
 | 
				
			||||||
        out: make(chan *Job, common.QUEUE_SIZE),
 | 
					        out: make(chan *Job, common.QUEUE_SIZE),
 | 
				
			||||||
        addr: addr,
 | 
					        addr: addr,
 | 
				
			||||||
@ -67,7 +79,7 @@ func New(addr string) (client *Client, err error) {
 | 
				
			|||||||
func (client *Client) connect() (err error) {
 | 
					func (client *Client) connect() (err error) {
 | 
				
			||||||
    client.mutex.Lock()
 | 
					    client.mutex.Lock()
 | 
				
			||||||
    defer client.mutex.Unlock()
 | 
					    defer client.mutex.Unlock()
 | 
				
			||||||
    client.conn, err = net.Dial(common.NETWORK, addr)
 | 
					    client.conn, err = net.Dial(common.NETWORK, client.addr)
 | 
				
			||||||
    return
 | 
					    return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -121,7 +133,7 @@ func (client *Client) unpack(data []byte) ([]byte, int, bool) {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
        if string(data[start:start+4]) == common.RES_STR {
 | 
					        if string(data[start:start+4]) == common.RES_STR {
 | 
				
			||||||
            l := int(common.BytesToUint32([4]byte{data[start+8],
 | 
					            l := int(common.BytesToUint32([4]byte{data[start+8],
 | 
				
			||||||
                data[start+9], data[start+10], data[start+11]}))
 | 
					            data[start+9], data[start+10], data[start+11]}))
 | 
				
			||||||
            total := l + common.PACKET_LEN
 | 
					            total := l + common.PACKET_LEN
 | 
				
			||||||
            if total == tl { // data is what we want
 | 
					            if total == tl { // data is what we want
 | 
				
			||||||
                return data, common.PACKET_LEN, true
 | 
					                return data, common.PACKET_LEN, true
 | 
				
			||||||
@ -198,8 +210,8 @@ func (client *Client) inLoop() {
 | 
				
			|||||||
        case common.ERROR:
 | 
					        case common.ERROR:
 | 
				
			||||||
            _, err := common.GetError(job.Data)
 | 
					            _, err := common.GetError(job.Data)
 | 
				
			||||||
            client.err(err)
 | 
					            client.err(err)
 | 
				
			||||||
        case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
 | 
					            case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
 | 
				
			||||||
        common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
 | 
					            common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
 | 
				
			||||||
            client.handleJob(job)
 | 
					            client.handleJob(job)
 | 
				
			||||||
        case common.ECHO_RES:
 | 
					        case common.ECHO_RES:
 | 
				
			||||||
            client.handleEcho(job)
 | 
					            client.handleEcho(job)
 | 
				
			||||||
@ -220,29 +232,43 @@ func (client *Client) err (e error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// job handler
 | 
					// job handler
 | 
				
			||||||
func (client *Client) handleJob(job *Job) {
 | 
					func (client *Client) handleJob(job *Job) {
 | 
				
			||||||
 | 
					    if h, ok := client.jobhandlers[job.UniqueId]; ok {
 | 
				
			||||||
 | 
					        h(job)
 | 
				
			||||||
 | 
					        delete(client.jobhandlers, job.UniqueId)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (client *Client) handleEcho(job *Job) {
 | 
				
			||||||
 | 
					    client.echo <- job.Data
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (client *Client) handleCreated(job *Job) {
 | 
				
			||||||
 | 
					    client.created <- string(job.Data)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// status handler
 | 
					// status handler
 | 
				
			||||||
func (client *Client) handleStatus(job *Job) {
 | 
					func (client *Client) handleStatus(job *Job) {
 | 
				
			||||||
        data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
 | 
					    data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
 | 
				
			||||||
        if len(data) != 5 {
 | 
					    if len(data) != 5 {
 | 
				
			||||||
            client.err(common.Errorf("Invalid data: %V", job.Data))
 | 
					        client.err(common.Errorf("Invalid data: %V", job.Data))
 | 
				
			||||||
            return
 | 
					        return
 | 
				
			||||||
        }
 | 
					    }
 | 
				
			||||||
        handle := string(data[0])
 | 
					    status := &Status{}
 | 
				
			||||||
        known := (data[1][0] == '1')
 | 
					    status.Handle = string(data[0])
 | 
				
			||||||
        running := (data[2][0] == '1')
 | 
					    status.Known = (data[1][0] == '1')
 | 
				
			||||||
        numerator, err := strconv.ParseUint(string(data[3][0]), 10, 0)
 | 
					    status.Running = (data[2][0] == '1')
 | 
				
			||||||
        if err != nil {
 | 
					    var err error
 | 
				
			||||||
            client.err(common.Errorf("Invalid handle: %s", data[3][0]))
 | 
					    status.Numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0)
 | 
				
			||||||
            return
 | 
					    if err != nil {
 | 
				
			||||||
        }
 | 
					        client.err(common.Errorf("Invalid handle: %s", data[3][0]))
 | 
				
			||||||
        denominator, err := strconv.ParseUint(string(data[4][0]), 10, 0)
 | 
					        return
 | 
				
			||||||
        if err != nil {
 | 
					    }
 | 
				
			||||||
            client.err(common.Errorf("Invalid handle: %s", data[4][0]))
 | 
					    status.Denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0)
 | 
				
			||||||
            return
 | 
					    if err != nil {
 | 
				
			||||||
        }
 | 
					        client.err(common.Errorf("Invalid handle: %s", data[4][0]))
 | 
				
			||||||
 | 
					        return
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    client.status <- status
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Send the job to job server.
 | 
					// Send the job to job server.
 | 
				
			||||||
@ -251,7 +277,8 @@ func (client *Client) writeJob(job *Job) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Internal do
 | 
					// Internal do
 | 
				
			||||||
func (client *Client) do(funcname string, data []byte, flag uint32) (id int) {
 | 
					func (client *Client) do(funcname string, data []byte,
 | 
				
			||||||
 | 
					flag uint32) (id string, handle string) {
 | 
				
			||||||
    id = strconv.Itoa(int(client.ai.Id()))
 | 
					    id = strconv.Itoa(int(client.ai.Id()))
 | 
				
			||||||
    l := len(funcname) + len(id) + len(data) + 2
 | 
					    l := len(funcname) + len(id) + len(data) + 2
 | 
				
			||||||
    rel := make([]byte, 0, l)
 | 
					    rel := make([]byte, 0, l)
 | 
				
			||||||
@ -261,6 +288,8 @@ func (client *Client) do(funcname string, data []byte, flag uint32) (id int) {
 | 
				
			|||||||
    rel = append(rel, '\x00')                       // 1 Byte
 | 
					    rel = append(rel, '\x00')                       // 1 Byte
 | 
				
			||||||
    rel = append(rel, data...)                      // len(data)
 | 
					    rel = append(rel, data...)                      // len(data)
 | 
				
			||||||
    client.writeJob(newJob(common.REQ, flag, rel))
 | 
					    client.writeJob(newJob(common.REQ, flag, rel))
 | 
				
			||||||
 | 
					    // Waiting for JOB_CREATED
 | 
				
			||||||
 | 
					    handle = <-client.created
 | 
				
			||||||
    return
 | 
					    return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -272,8 +301,8 @@ func (client *Client) do(funcname string, data []byte, flag uint32) (id int) {
 | 
				
			|||||||
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
 | 
					// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
 | 
				
			||||||
// and if it is background job: JOB_BG.
 | 
					// and if it is background job: JOB_BG.
 | 
				
			||||||
// JOB_LOW | JOB_BG means the job is running with low level in background.
 | 
					// JOB_LOW | JOB_BG means the job is running with low level in background.
 | 
				
			||||||
func (client *Client) Do(funcname string, data []byte, flag byte)
 | 
					func (client *Client) Do(funcname string, data []byte,
 | 
				
			||||||
(handle string, err error) {
 | 
					flag byte, jobhandler JobHandler) (handle string) {
 | 
				
			||||||
    var datatype uint32
 | 
					    var datatype uint32
 | 
				
			||||||
    switch flag {
 | 
					    switch flag {
 | 
				
			||||||
    case JOB_LOW :
 | 
					    case JOB_LOW :
 | 
				
			||||||
@ -283,11 +312,16 @@ func (client *Client) Do(funcname string, data []byte, flag byte)
 | 
				
			|||||||
    default:
 | 
					    default:
 | 
				
			||||||
        datatype = common.SUBMIT_JOB
 | 
					        datatype = common.SUBMIT_JOB
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    client.do(funcname, data, datatype)
 | 
					    var id string
 | 
				
			||||||
 | 
					    id, handle = client.do(funcname, data, datatype)
 | 
				
			||||||
 | 
					    if jobhandler != nil {
 | 
				
			||||||
 | 
					        client.jobhandlers[id] = jobhandler
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (client *Client) DoBg(funcname string, data []byte, flag byte)
 | 
					func (client *Client) DoBg(funcname string, data []byte,
 | 
				
			||||||
(handle string, err error) {
 | 
					flag byte) (handle string) {
 | 
				
			||||||
    var datatype uint32
 | 
					    var datatype uint32
 | 
				
			||||||
    switch flag {
 | 
					    switch flag {
 | 
				
			||||||
    case JOB_LOW :
 | 
					    case JOB_LOW :
 | 
				
			||||||
@ -297,25 +331,34 @@ func (client *Client) DoBg(funcname string, data []byte, flag byte)
 | 
				
			|||||||
    default:
 | 
					    default:
 | 
				
			||||||
        datatype = common.SUBMIT_JOB_BG
 | 
					        datatype = common.SUBMIT_JOB_BG
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    client.do(funcname, data, datatype)
 | 
					    _, handle = client.do(funcname, data, datatype)
 | 
				
			||||||
 | 
					    return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Get job status from job server.
 | 
					// Get job status from job server.
 | 
				
			||||||
// !!!Not fully tested.!!!
 | 
					// !!!Not fully tested.!!!
 | 
				
			||||||
func (client *Client) Status(handle string, handler StatusHandler) {
 | 
					func (client *Client) Status(handle string) (status *Status) {
 | 
				
			||||||
    client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle)))
 | 
					    client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle)))
 | 
				
			||||||
 | 
					    status = <-client.status
 | 
				
			||||||
 | 
					    return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Send a something out, get the samething back.
 | 
					// Send a something out, get the samething back.
 | 
				
			||||||
func (client *Client) Echo(data []byte, handler JobHandler) (echo []byte) {
 | 
					func (client *Client) Echo(data []byte) (r []byte) {
 | 
				
			||||||
    client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
 | 
					    client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
 | 
				
			||||||
    client.echo
 | 
					    r = <-client.echo
 | 
				
			||||||
 | 
					    return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Close
 | 
					// Close
 | 
				
			||||||
func (client *Client) Close() (err error) {
 | 
					func (client *Client) Close() (err error) {
 | 
				
			||||||
    close(client.in)
 | 
					    close(client.in)
 | 
				
			||||||
    close(client.out)
 | 
					    close(client.out)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    close(client.echo)
 | 
				
			||||||
 | 
					    close(client.created)
 | 
				
			||||||
 | 
					    close(client.status)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return client.conn.Close();
 | 
					    return client.conn.Close();
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -15,22 +15,17 @@ func TestClientAddServer(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestClientEcho(t *testing.T) {
 | 
					func TestClientEcho(t *testing.T) {
 | 
				
			||||||
    echoHandler = func(job *Job) {
 | 
					    if echo := string(client.Echo([]byte("Hello world"))); echo == "Hello world" {
 | 
				
			||||||
        echo := string(job.Data)
 | 
					        t.Log(echo)
 | 
				
			||||||
        if echo == "Hello world" {
 | 
					    } else {
 | 
				
			||||||
            t.Log(echo)
 | 
					        t.Errorf("Invalid echo data: %s", echo)
 | 
				
			||||||
        } else {
 | 
					 | 
				
			||||||
            t.Errorf("Invalid echo data: %s", job.Data)
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        return
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    client.Echo([]byte("Hello world"), echoHandler)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestClientDoBg(t *testing.T) {
 | 
					func TestClientDoBg(t *testing.T) {
 | 
				
			||||||
    if handle, err := client.DoBg("ToUpper", []byte("abcdef"),
 | 
					    if handle := client.DoBg("ToUpper", []byte("abcdef"),
 | 
				
			||||||
        JOB_LOW); err != nil {
 | 
					        JOB_LOW); handle == "" {
 | 
				
			||||||
        t.Error(err)
 | 
					        t.Error("Handle is empty.")
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
        t.Log(handle)
 | 
					        t.Log(handle)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -46,42 +41,31 @@ func TestClientDo(t *testing.T) {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
        return
 | 
					        return
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    if handle, err := client.Do("ToUpper", []byte("abcdef"),
 | 
					    if handle := client.Do("ToUpper", []byte("abcdef"),
 | 
				
			||||||
        JOB_LOW, jobHandler); err != nil {
 | 
					        JOB_LOW, jobHandler); handle == "" {
 | 
				
			||||||
        t.Error(err)
 | 
					        t.Error("Handle is empty.")
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
        t.Log(handle)
 | 
					        t.Log(handle)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestClientStatus(t *testing.T) {
 | 
					func TestClientStatus(t *testing.T) {
 | 
				
			||||||
    statusHandler = func(handler string, known bool,
 | 
					
 | 
				
			||||||
        running bool, numerator uint64, denominator uint64) {
 | 
					    s1 := client.Status("handle not exists")
 | 
				
			||||||
        if known {
 | 
					    if s1.Known {
 | 
				
			||||||
            t.Errorf("The job (%s) shouldn't be known.", handler)
 | 
					        t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
 | 
				
			||||||
        }
 | 
					    }
 | 
				
			||||||
        if running {
 | 
					    if s1.Running {
 | 
				
			||||||
            t.Errorf("The job (%s) shouldn't be running.", handler)
 | 
					        t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    client.Status("handle not exists", statusHandler)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if handle, err := client.Do("Delay5sec", []byte("abcdef"),
 | 
					    handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil);
 | 
				
			||||||
        JOB_LOW, nil); err != nil {
 | 
					    s2 := client.Status(handle)
 | 
				
			||||||
        t.Error(err)
 | 
					    if !s2.Known {
 | 
				
			||||||
    } else {
 | 
					        t.Errorf("The job (%s) should be known.", s2.Handle)
 | 
				
			||||||
        t.Log(handle)
 | 
					    }
 | 
				
			||||||
 | 
					    if s2.Running {
 | 
				
			||||||
        statusHandler = func(handler string, known bool,
 | 
					        t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
 | 
				
			||||||
            running bool, numerator uint64, denominator uint64) {
 | 
					 | 
				
			||||||
            if !known {
 | 
					 | 
				
			||||||
                t.Errorf("The job (%s) shouldn be known.", handler)
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            if !running {
 | 
					 | 
				
			||||||
                t.Errorf("The job (%s) shouldn be running.", handler)
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        client.Status(handle, statusHandler)
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										7
									
								
								client/status.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								client/status.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,7 @@
 | 
				
			|||||||
 | 
					package client
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type Status struct {
 | 
				
			||||||
 | 
					    Handle string
 | 
				
			||||||
 | 
					    Known, Running bool
 | 
				
			||||||
 | 
					    Numerator, Denominator uint64
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user