forked from yuxh/gearman-go
Merge pull request #96 from floringavrila/add-support-for-own-ids
add support for own ids (coalescing)
This commit is contained in:
commit
2a518e8661
@ -217,7 +217,10 @@ type handleOrError struct {
|
||||
}
|
||||
|
||||
func (client *Client) do(funcname string, data []byte,
|
||||
flag uint32, h ResponseHandler) (handle string, err error) {
|
||||
flag uint32, h ResponseHandler, id string) (handle string, err error) {
|
||||
if len(id) == 0 {
|
||||
return "", ErrInvalidId
|
||||
}
|
||||
if client.conn == nil {
|
||||
return "", ErrLostConn
|
||||
}
|
||||
@ -233,7 +236,6 @@ func (client *Client) do(funcname string, data []byte,
|
||||
handle = resp.Handle
|
||||
result <- handleOrError{handle, nil}
|
||||
}, h)
|
||||
id := IdGen.Id()
|
||||
req := getJob(id, []byte(funcname), data)
|
||||
req.DataType = flag
|
||||
if err = client.write(req); err != nil {
|
||||
@ -255,17 +257,7 @@ func (client *Client) do(funcname string, data []byte,
|
||||
// flag can be set to: JobLow, JobNormal and JobHigh
|
||||
func (client *Client) Do(funcname string, data []byte,
|
||||
flag byte, h ResponseHandler) (handle string, err error) {
|
||||
var datatype uint32
|
||||
switch flag {
|
||||
case JobLow:
|
||||
datatype = dtSubmitJobLow
|
||||
case JobHigh:
|
||||
datatype = dtSubmitJobHigh
|
||||
default:
|
||||
datatype = dtSubmitJob
|
||||
}
|
||||
|
||||
handle, err = client.do(funcname, data, datatype, h)
|
||||
handle, err = client.DoWithId(funcname, data, flag, h, IdGen.Id())
|
||||
return
|
||||
}
|
||||
|
||||
@ -273,19 +265,7 @@ func (client *Client) Do(funcname string, data []byte,
|
||||
// flag can be set to: JobLow, JobNormal and JobHigh
|
||||
func (client *Client) DoBg(funcname string, data []byte,
|
||||
flag byte) (handle string, err error) {
|
||||
if client.conn == nil {
|
||||
return "", ErrLostConn
|
||||
}
|
||||
var datatype uint32
|
||||
switch flag {
|
||||
case JobLow:
|
||||
datatype = dtSubmitJobLowBg
|
||||
case JobHigh:
|
||||
datatype = dtSubmitJobHighBg
|
||||
default:
|
||||
datatype = dtSubmitJobBg
|
||||
}
|
||||
handle, err = client.do(funcname, data, datatype, nil)
|
||||
handle, err = client.DoBgWithId(funcname, data, flag, IdGen.Id())
|
||||
return
|
||||
}
|
||||
|
||||
@ -341,3 +321,40 @@ func (client *Client) Close() (err error) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Call the function and get a response.
|
||||
// flag can be set to: JobLow, JobNormal and JobHigh
|
||||
func (client *Client) DoWithId(funcname string, data []byte,
|
||||
flag byte, h ResponseHandler, id string) (handle string, err error) {
|
||||
var datatype uint32
|
||||
switch flag {
|
||||
case JobLow:
|
||||
datatype = dtSubmitJobLow
|
||||
case JobHigh:
|
||||
datatype = dtSubmitJobHigh
|
||||
default:
|
||||
datatype = dtSubmitJob
|
||||
}
|
||||
handle, err = client.do(funcname, data, datatype, h, id)
|
||||
return
|
||||
}
|
||||
|
||||
// Call the function in background, no response needed.
|
||||
// flag can be set to: JobLow, JobNormal and JobHigh
|
||||
func (client *Client) DoBgWithId(funcname string, data []byte,
|
||||
flag byte, id string) (handle string, err error) {
|
||||
if client.conn == nil {
|
||||
return "", ErrLostConn
|
||||
}
|
||||
var datatype uint32
|
||||
switch flag {
|
||||
case JobLow:
|
||||
datatype = dtSubmitJobLowBg
|
||||
case JobHigh:
|
||||
datatype = dtSubmitJobHighBg
|
||||
default:
|
||||
datatype = dtSubmitJobBg
|
||||
}
|
||||
handle, err = client.do(funcname, data, datatype, nil, id)
|
||||
return
|
||||
}
|
||||
|
@ -1,8 +1,11 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
@ -72,6 +75,42 @@ func TestClientDoBg(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientDoBgWithId(t *testing.T) {
|
||||
if !runIntegrationTests {
|
||||
t.Skip("To run this test, use: go test -integration")
|
||||
}
|
||||
data := []byte("abcdef")
|
||||
hash := md5.Sum(data)
|
||||
id := hex.EncodeToString(hash[:])
|
||||
handle, err := client.DoBgWithId("ToUpper", data, JobLow, id)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if handle == "" {
|
||||
t.Error("Handle is empty.")
|
||||
} else {
|
||||
t.Log(handle)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientDoBgWithIdFailsIfNoId(t *testing.T) {
|
||||
if !runIntegrationTests {
|
||||
t.Skip("To run this test, use: go test -integration")
|
||||
}
|
||||
data := []byte("abcdef")
|
||||
id := ""
|
||||
_, err := client.DoBgWithId("ToUpper", data, JobLow, id)
|
||||
if err == nil {
|
||||
t.Error("Expecting error")
|
||||
return
|
||||
}
|
||||
if err.Error() != "Invalid ID" {
|
||||
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientDo(t *testing.T) {
|
||||
if !runIntegrationTests {
|
||||
t.Skip("To run this test, use: go test -integration")
|
||||
@ -98,6 +137,138 @@ func TestClientDo(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientDoWithId(t *testing.T) {
|
||||
if !runIntegrationTests {
|
||||
t.Skip("To run this test, use: go test -integration")
|
||||
}
|
||||
jobHandler := func(job *Response) {
|
||||
str := string(job.Data)
|
||||
if str == "ABCDEF" {
|
||||
t.Log(str)
|
||||
} else {
|
||||
t.Errorf("Invalid data: %s", job.Data)
|
||||
}
|
||||
return
|
||||
}
|
||||
data := []byte("abcdef")
|
||||
hash := md5.Sum(data)
|
||||
id := hex.EncodeToString(hash[:])
|
||||
handle, err := client.DoWithId("ToUpper", data,
|
||||
JobLow, jobHandler, id)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if handle == "" {
|
||||
t.Error("Handle is empty.")
|
||||
} else {
|
||||
t.Log(handle)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientDoWithIdFailsIfNoId(t *testing.T) {
|
||||
if !runIntegrationTests {
|
||||
t.Skip("To run this test, use: go test -integration")
|
||||
}
|
||||
jobHandler := func(job *Response) {
|
||||
str := string(job.Data)
|
||||
if str == "ABCDEF" {
|
||||
t.Log(str)
|
||||
} else {
|
||||
t.Errorf("Invalid data: %s", job.Data)
|
||||
}
|
||||
return
|
||||
}
|
||||
data := []byte("abcdef")
|
||||
id := ""
|
||||
_, err := client.DoWithId("ToUpper", data,
|
||||
JobLow, jobHandler, id)
|
||||
if err == nil {
|
||||
t.Error("Expecting error")
|
||||
return
|
||||
}
|
||||
if err.Error() != "Invalid ID" {
|
||||
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientDoWithIdCheckSameHandle(t *testing.T) {
|
||||
if !runIntegrationTests {
|
||||
t.Skip("To run this test, use: go test -integration")
|
||||
}
|
||||
jobHandler := func(job *Response) {
|
||||
return
|
||||
}
|
||||
data := []byte("{productId:123,categoryId:1}")
|
||||
id := "123"
|
||||
handle1, err := client.DoWithId("PublishProduct", data,
|
||||
JobLow, jobHandler, id)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if handle1 == "" {
|
||||
t.Error("Handle is empty.")
|
||||
} else {
|
||||
t.Log(handle1)
|
||||
}
|
||||
|
||||
handle2, err := client.DoWithId("PublishProduct", data,
|
||||
JobLow, jobHandler, id)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if handle2 == "" {
|
||||
t.Error("Handle is empty.")
|
||||
} else {
|
||||
t.Log(handle2)
|
||||
}
|
||||
|
||||
if handle1 != handle2 {
|
||||
t.Error("expecting the same handle when using the same id on the same Job name")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientDoWithIdCheckDifferentHandleOnDifferentJobs(t *testing.T) {
|
||||
if !runIntegrationTests {
|
||||
t.Skip("To run this test, use: go test -integration")
|
||||
}
|
||||
jobHandler := func(job *Response) {
|
||||
return
|
||||
}
|
||||
data := []byte("{productId:123}")
|
||||
id := "123"
|
||||
handle1, err := client.DoWithId("PublishProduct", data,
|
||||
JobLow, jobHandler, id)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if handle1 == "" {
|
||||
t.Error("Handle is empty.")
|
||||
} else {
|
||||
t.Log(handle1)
|
||||
}
|
||||
|
||||
handle2, err := client.DoWithId("DeleteProduct", data,
|
||||
JobLow, jobHandler, id)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if handle2 == "" {
|
||||
t.Error("Handle is empty.")
|
||||
} else {
|
||||
t.Log(handle2)
|
||||
}
|
||||
|
||||
if handle1 == handle2 {
|
||||
t.Error("expecting different handles because there are different job names")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientMultiDo(t *testing.T) {
|
||||
if !runIntegrationTests {
|
||||
t.Skip("To run this test, use: go test -integration")
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
var (
|
||||
ErrWorkWarning = errors.New("Work warning")
|
||||
ErrInvalidData = errors.New("Invalid data")
|
||||
ErrInvalidId = errors.New("Invalid ID")
|
||||
ErrWorkFail = errors.New("Work fail")
|
||||
ErrWorkException = errors.New("Work exeption")
|
||||
ErrDataType = errors.New("Invalid data type")
|
||||
|
Loading…
Reference in New Issue
Block a user