diff --git a/client/client.go b/client/client.go index 93a08c2..78df3f9 100644 --- a/client/client.go +++ b/client/client.go @@ -34,6 +34,13 @@ type Client struct { in chan []byte out chan *Job + + created chan string + echo chan []byte + status chan *Status + + jobhandlers map[string]JobHandler + conn net.Conn addr string ai *autoinc.AutoInc @@ -46,7 +53,12 @@ type Client struct { // client, err := client.New("127.0.0.1:4730") func New(addr string) (client *Client, err error) { client = &Client{ - jobCreated: make(chan *Job), + created: make(chan string, common.QUEUE_SIZE), + echo: make(chan []byte, common.QUEUE_SIZE), + status: make(chan *Status, common.QUEUE_SIZE), + + jobhandlers: make(map[string]JobHandler, common.QUEUE_SIZE), + in: make(chan []byte, common.QUEUE_SIZE), out: make(chan *Job, common.QUEUE_SIZE), addr: addr, @@ -67,7 +79,7 @@ func New(addr string) (client *Client, err error) { func (client *Client) connect() (err error) { client.mutex.Lock() defer client.mutex.Unlock() - client.conn, err = net.Dial(common.NETWORK, addr) + client.conn, err = net.Dial(common.NETWORK, client.addr) return } @@ -121,7 +133,7 @@ func (client *Client) unpack(data []byte) ([]byte, int, bool) { } if string(data[start:start+4]) == common.RES_STR { l := int(common.BytesToUint32([4]byte{data[start+8], - data[start+9], data[start+10], data[start+11]})) + data[start+9], data[start+10], data[start+11]})) total := l + common.PACKET_LEN if total == tl { // data is what we want return data, common.PACKET_LEN, true @@ -198,8 +210,8 @@ func (client *Client) inLoop() { case common.ERROR: _, err := common.GetError(job.Data) client.err(err) - case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, - common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION: + case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, + common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION: client.handleJob(job) case common.ECHO_RES: client.handleEcho(job) @@ -220,29 +232,43 @@ func (client *Client) err (e error) { // job handler func (client *Client) handleJob(job *Job) { + if h, ok := client.jobhandlers[job.UniqueId]; ok { + h(job) + delete(client.jobhandlers, job.UniqueId) + } +} +func (client *Client) handleEcho(job *Job) { + client.echo <- job.Data +} + +func (client *Client) handleCreated(job *Job) { + client.created <- string(job.Data) } // status handler func (client *Client) handleStatus(job *Job) { - data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) - if len(data) != 5 { - client.err(common.Errorf("Invalid data: %V", job.Data)) - return - } - handle := string(data[0]) - known := (data[1][0] == '1') - running := (data[2][0] == '1') - numerator, err := strconv.ParseUint(string(data[3][0]), 10, 0) - if err != nil { - client.err(common.Errorf("Invalid handle: %s", data[3][0])) - return - } - denominator, err := strconv.ParseUint(string(data[4][0]), 10, 0) - if err != nil { - client.err(common.Errorf("Invalid handle: %s", data[4][0])) - return - } + data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) + if len(data) != 5 { + client.err(common.Errorf("Invalid data: %V", job.Data)) + return + } + status := &Status{} + status.Handle = string(data[0]) + status.Known = (data[1][0] == '1') + status.Running = (data[2][0] == '1') + var err error + status.Numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0) + if err != nil { + client.err(common.Errorf("Invalid handle: %s", data[3][0])) + return + } + status.Denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0) + if err != nil { + client.err(common.Errorf("Invalid handle: %s", data[4][0])) + return + } + client.status <- status } // Send the job to job server. @@ -251,7 +277,8 @@ func (client *Client) writeJob(job *Job) { } // Internal do -func (client *Client) do(funcname string, data []byte, flag uint32) (id int) { +func (client *Client) do(funcname string, data []byte, +flag uint32) (id string, handle string) { id = strconv.Itoa(int(client.ai.Id())) l := len(funcname) + len(id) + len(data) + 2 rel := make([]byte, 0, l) @@ -261,6 +288,8 @@ func (client *Client) do(funcname string, data []byte, flag uint32) (id int) { rel = append(rel, '\x00') // 1 Byte rel = append(rel, data...) // len(data) client.writeJob(newJob(common.REQ, flag, rel)) + // Waiting for JOB_CREATED + handle = <-client.created return } @@ -272,8 +301,8 @@ func (client *Client) do(funcname string, data []byte, flag uint32) (id int) { // flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, // and if it is background job: JOB_BG. // JOB_LOW | JOB_BG means the job is running with low level in background. -func (client *Client) Do(funcname string, data []byte, flag byte) -(handle string, err error) { +func (client *Client) Do(funcname string, data []byte, +flag byte, jobhandler JobHandler) (handle string) { var datatype uint32 switch flag { case JOB_LOW : @@ -283,11 +312,16 @@ func (client *Client) Do(funcname string, data []byte, flag byte) default: datatype = common.SUBMIT_JOB } - client.do(funcname, data, datatype) + var id string + id, handle = client.do(funcname, data, datatype) + if jobhandler != nil { + client.jobhandlers[id] = jobhandler + } + return } -func (client *Client) DoBg(funcname string, data []byte, flag byte) -(handle string, err error) { +func (client *Client) DoBg(funcname string, data []byte, +flag byte) (handle string) { var datatype uint32 switch flag { case JOB_LOW : @@ -297,25 +331,34 @@ func (client *Client) DoBg(funcname string, data []byte, flag byte) default: datatype = common.SUBMIT_JOB_BG } - client.do(funcname, data, datatype) + _, handle = client.do(funcname, data, datatype) + return } // Get job status from job server. // !!!Not fully tested.!!! -func (client *Client) Status(handle string, handler StatusHandler) { +func (client *Client) Status(handle string) (status *Status) { client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle))) + status = <-client.status + return } // Send a something out, get the samething back. -func (client *Client) Echo(data []byte, handler JobHandler) (echo []byte) { +func (client *Client) Echo(data []byte) (r []byte) { client.writeJob(newJob(common.REQ, common.ECHO_REQ, data)) - client.echo + r = <-client.echo + return } // Close func (client *Client) Close() (err error) { close(client.in) close(client.out) + + close(client.echo) + close(client.created) + close(client.status) + return client.conn.Close(); } diff --git a/client/client_test.go b/client/client_test.go index 56d04ae..287cea7 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -15,22 +15,17 @@ func TestClientAddServer(t *testing.T) { } func TestClientEcho(t *testing.T) { - echoHandler = func(job *Job) { - echo := string(job.Data) - if echo == "Hello world" { - t.Log(echo) - } else { - t.Errorf("Invalid echo data: %s", job.Data) - } - return + if echo := string(client.Echo([]byte("Hello world"))); echo == "Hello world" { + t.Log(echo) + } else { + t.Errorf("Invalid echo data: %s", echo) } - client.Echo([]byte("Hello world"), echoHandler) } func TestClientDoBg(t *testing.T) { - if handle, err := client.DoBg("ToUpper", []byte("abcdef"), - JOB_LOW); err != nil { - t.Error(err) + if handle := client.DoBg("ToUpper", []byte("abcdef"), + JOB_LOW); handle == "" { + t.Error("Handle is empty.") } else { t.Log(handle) } @@ -46,42 +41,31 @@ func TestClientDo(t *testing.T) { } return } - if handle, err := client.Do("ToUpper", []byte("abcdef"), - JOB_LOW, jobHandler); err != nil { - t.Error(err) + if handle := client.Do("ToUpper", []byte("abcdef"), + JOB_LOW, jobHandler); handle == "" { + t.Error("Handle is empty.") } else { t.Log(handle) } } func TestClientStatus(t *testing.T) { - statusHandler = func(handler string, known bool, - running bool, numerator uint64, denominator uint64) { - if known { - t.Errorf("The job (%s) shouldn't be known.", handler) - } - if running { - t.Errorf("The job (%s) shouldn't be running.", handler) - } + + s1 := client.Status("handle not exists") + if s1.Known { + t.Errorf("The job (%s) shouldn't be known.", s1.Handle) + } + if s1.Running { + t.Errorf("The job (%s) shouldn't be running.", s1.Handle) } - client.Status("handle not exists", statusHandler) - if handle, err := client.Do("Delay5sec", []byte("abcdef"), - JOB_LOW, nil); err != nil { - t.Error(err) - } else { - t.Log(handle) - - statusHandler = func(handler string, known bool, - running bool, numerator uint64, denominator uint64) { - if !known { - t.Errorf("The job (%s) shouldn be known.", handler) - } - if !running { - t.Errorf("The job (%s) shouldn be running.", handler) - } - } - client.Status(handle, statusHandler) + handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil); + s2 := client.Status(handle) + if !s2.Known { + t.Errorf("The job (%s) should be known.", s2.Handle) + } + if s2.Running { + t.Errorf("The job (%s) shouldn't be running.", s2.Handle) } } diff --git a/client/status.go b/client/status.go new file mode 100644 index 0000000..5671c60 --- /dev/null +++ b/client/status.go @@ -0,0 +1,7 @@ +package client + +type Status struct { + Handle string + Known, Running bool + Numerator, Denominator uint64 +}