From 79c4bc2e47aa74211227ba175901b9f6180122c9 Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 5 Jun 2012 14:36:39 +0800 Subject: [PATCH 1/9] promoted the executing timeout --HG-- branch : dev --- worker/agent.go | 1 - worker/job.go | 15 ++++++++++++++- worker/worker.go | 29 +++++++++++++++++++++++++---- 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 66bf4c1..1e0fe45 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -72,7 +72,6 @@ func (a *agent) inLoop() { rel, err := a.read() if err != nil { if err == common.ErrConnection { - // TODO: reconnection for i:= 0; i < 3 && a.worker.running; i++ { if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { a.worker.err(common.Errorf("Reconnection: %d faild", i)) diff --git a/worker/job.go b/worker/job.go index 687ffb1..71d9ce5 100644 --- a/worker/job.go +++ b/worker/job.go @@ -16,13 +16,16 @@ type Job struct { Handle, UniqueId string agent *agent magicCode, DataType uint32 + c chan bool } // Create a new job func newJob(magiccode, datatype uint32, data []byte) (job *Job) { return &Job{magicCode: magiccode, DataType: datatype, - Data: data} + Data: data, + c: make(chan bool), + } } // Decode job from byte slice @@ -88,3 +91,13 @@ func (job *Job) UpdateStatus(numerator, denominator int) { result = append(result, d...) job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) } + +// cancel the job executing +func (job *Job) cancel() { + job.c <- true +} + +// When a job was canceled, return a true form a channel +func (job *Job) Canceled() chan bool { + return job.c +} diff --git a/worker/worker.go b/worker/worker.go index 5d25b42..dd99c44 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,6 +5,7 @@ package worker import ( + "time" "bytes" "bitbucket.org/mikespook/gearman-go/common" ) @@ -271,21 +272,36 @@ func (worker *Worker) exec(job *Job) (err error) { if !ok { return common.Errorf("The function does not exist: %s", funcname) } - result, err := f.f(job) + rslt := make(chan *result) + defer close(rslt) + go func() { + defer func() {recover()}() + var r result + r.data, r.err = f.f(job) + rslt <- &r + }() + var r *result + select { + case r = <-rslt: + case <-time.After(time.Duration(f.timeout) * time.Second): + r = &result{data:nil, err: common.ErrExecTimeOut} + job.cancel() + } var datatype uint32 - if err == nil { + if r.err == nil { datatype = common.WORK_COMPLETE } else { - if result == nil { + if r.data == nil { datatype = common.WORK_FAIL } else { datatype = common.WORK_EXCEPTION } + err = r.err } job.magicCode = common.REQ job.DataType = datatype - job.Data = result + job.Data = r.data job.agent.WriteJob(job) return } @@ -300,3 +316,8 @@ func (worker *Worker) removeAgent(a *agent) { worker.err(common.ErrNoActiveAgent) } } + +type result struct { + data []byte + err error +} From 3051e6fe4bbd498e3f63508de21a25b8045873f5 Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 12 Jun 2012 12:22:20 +0800 Subject: [PATCH 2/9] tweaking --HG-- branch : dev --- worker/job.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/worker/job.go b/worker/job.go index 71d9ce5..e3cebe0 100644 --- a/worker/job.go +++ b/worker/job.go @@ -46,15 +46,14 @@ func decodeJob(data []byte) (job *Job, err error) { // Encode a job to byte slice func (job *Job) Encode() (data []byte) { l := len(job.Data) - tl := l if job.Handle != "" { - tl += len(job.Handle) + 1 + l += len(job.Handle) + 1 } - data = make([]byte, 0, tl + 12) + data = make([]byte, 0, l + 12) magiccode := common.Uint32ToBytes(job.magicCode) datatype := common.Uint32ToBytes(job.DataType) - datalength := common.Uint32ToBytes(uint32(tl)) + datalength := common.Uint32ToBytes(uint32(l)) data = append(data, magiccode[:]...) data = append(data, datatype[:]...) From e82732a5cc4447fff4e8b4b9470471b952e6d6e2 Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 26 Jun 2012 14:32:26 +0800 Subject: [PATCH 3/9] replace the covertion method --HG-- branch : dev --- common/gearman.go | 26 ++++++++++++++++++++++---- common/gearman_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 4 deletions(-) create mode 100644 common/gearman_test.go diff --git a/common/gearman.go b/common/gearman.go index cc9d41e..e9cc5e4 100644 --- a/common/gearman.go +++ b/common/gearman.go @@ -5,6 +5,11 @@ package common +import ( + "bytes" + "encoding/binary" +) + const ( NETWORK = "tcp" // queue size @@ -55,12 +60,25 @@ const ( // Decode [4]byte to uint32 func BytesToUint32(buf [4]byte) uint32 { - return uint32(buf[0])<<24 + uint32(buf[1])<<16 + uint32(buf[2])<<8 + - uint32(buf[3]) + var r uint32 + b := bytes.NewBuffer(buf[:]) + err := binary.Read(b, binary.BigEndian, &r) + if err != nil { + return 0 + } + return r } // Encode uint32 to [4]byte func Uint32ToBytes(i uint32) [4]byte { - return [4]byte{byte((i >> 24) & 0xff), byte((i >> 16) & 0xff), - byte((i >> 8) & 0xff), byte(i & 0xff),} + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.BigEndian, i) + if err != nil { + return [4]byte{0, 0, 0, 0} + } + var r [4]byte + for k, v := range buf.Bytes() { + r[k] = v + } + return r } diff --git a/common/gearman_test.go b/common/gearman_test.go new file mode 100644 index 0000000..746b8b1 --- /dev/null +++ b/common/gearman_test.go @@ -0,0 +1,38 @@ +package common + +import ( + "bytes" + "testing" +) + +var ( + testCase = map[uint32][4]byte { + 0: [...]byte{0, 0, 0, 0}, + 1: [...]byte{0, 0, 0, 1}, + 256: [...]byte{0, 0, 1, 0}, + 256 * 256: [...]byte{0, 1, 0, 0}, + 256 * 256 * 256: [...]byte{1, 0, 0, 0}, + 256 * 256 * 256 + 256 * 256 + 256 + 1: [...]byte{1, 1, 1, 1}, + 4294967295 : [...]byte{0xFF, 0xFF, 0xFF, 0xFF}, + } +) + +func TestUint32ToBytes(t *testing.T) { + for k, v := range testCase { + b := Uint32ToBytes(k) + if bytes.Compare(b[:], v[:]) != 0 { + t.Errorf("%v was expected, but %v was got", v, b) + } + } +} + +func TestBytesToUint32s(t *testing.T) { + for k, v := range testCase { + u := BytesToUint32([4]byte(v)) + if u != k { + t.Errorf("%v was expected, but %v was got", k, u) + } + } +} + + From b7ee1d68f50581d1b1115657c7a0662142642b54 Mon Sep 17 00:00:00 2001 From: mikespook Date: Thu, 30 Aug 2012 16:12:15 +0800 Subject: [PATCH 4/9] fixed a exec issue, timeout exec need to fix --HG-- branch : dev --- example/py/client.py | 2 +- example/worker.go | 6 +++--- worker/agent.go | 22 +++++++--------------- worker/worker.go | 32 +++++++++++++++++++------------- 4 files changed, 30 insertions(+), 32 deletions(-) diff --git a/example/py/client.py b/example/py/client.py index 592f8ee..d40551e 100755 --- a/example/py/client.py +++ b/example/py/client.py @@ -16,6 +16,6 @@ def main(): check_request_status(completed_job_request) if __name__ == '__main__': - for i in range(100): + for i in range(2): main() diff --git a/example/worker.go b/example/worker.go index 18fed29..a747584 100644 --- a/example/worker.go +++ b/example/worker.go @@ -9,7 +9,7 @@ import ( ) func ToUpper(job *worker.Job) ([]byte, error) { - log.Printf("Handle=[%s]; UID=[%s], Data=[%s]\n", + log.Printf("ToUpper -- Handle=[%s]; UID=[%s], Data=[%s]\n", job.Handle, job.UniqueId, job.Data) data := []byte(strings.ToUpper(string(job.Data))) return data, nil @@ -33,8 +33,8 @@ func main() { } } w.JobHandler = func(job *worker.Job) error { - log.Printf("H=%s, UID=%s, Data=%s\n", job.Handle, - job.UniqueId, job.Data) + log.Printf("H=%s, UID=%s, Data=%s, DataType=%d\n", job.Handle, + job.UniqueId, job.Data, job.DataType) return nil } w.AddServer("127.0.0.1:4730") diff --git a/worker/agent.go b/worker/agent.go index 1e0fe45..7740acd 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -7,7 +7,6 @@ package worker import ( "io" "net" - "time" "bitbucket.org/mikespook/gearman-go/common" ) @@ -33,6 +32,8 @@ func newAgent(addr string, worker *Worker) (a *agent, err error) { in: make(chan []byte, common.QUEUE_SIZE), out: make(chan *Job, common.QUEUE_SIZE), } + // reset abilities + a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil)) return } @@ -52,21 +53,15 @@ func (a *agent) outLoop() { // inputing loop func (a *agent) inLoop() { defer func() { - recover() + if r := recover(); r != nil { + a.worker.err(common.Errorf("Exiting: %s", r)) + } close(a.in) close(a.out) a.worker.removeAgent(a) }() - noop := true - go func() { - for a.worker.running { - if noop && len(a.in) == 0 { - a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) - } - <-time.After(time.Second) - } - }() for a.worker.running { + a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) RESTART: // got noop msg and in queue is zero, grab job rel, err := a.read() @@ -94,10 +89,7 @@ func (a *agent) inLoop() { } switch job.DataType { case common.NOOP: - noop = true - case common.NO_JOB: - noop = false - a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) + a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil)) case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: job.agent = a a.worker.in <- job diff --git a/worker/worker.go b/worker/worker.go index dd99c44..b248aaa 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -272,20 +272,26 @@ func (worker *Worker) exec(job *Job) (err error) { if !ok { return common.Errorf("The function does not exist: %s", funcname) } - rslt := make(chan *result) - defer close(rslt) - go func() { - defer func() {recover()}() - var r result + var r result + if f.timeout == 0 { r.data, r.err = f.f(job) - rslt <- &r - }() - var r *result - select { - case r = <-rslt: - case <-time.After(time.Duration(f.timeout) * time.Second): - r = &result{data:nil, err: common.ErrExecTimeOut} - job.cancel() + } else { + rslt := make(chan *result) + defer close(rslt) + go func() { + defer func() {recover()}() + var r result + r.data, r.err = f.f(job) + rslt <- &r + }() + select { + case re := <-rslt: + r.data = re.data + r.err = re.err + case <-time.After(time.Duration(f.timeout) * time.Second): + r.err = common.ErrExecTimeOut + job.cancel() + } } var datatype uint32 if r.err == nil { From adf3729627af78b0986a861a8788a2a093dc6dac Mon Sep 17 00:00:00 2001 From: mikespook Date: Thu, 30 Aug 2012 17:56:10 +0800 Subject: [PATCH 5/9] update examples --HG-- branch : dev --- example/py/client.py | 10 ++++++++-- example/worker.go | 16 ++++++++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/example/py/client.py b/example/py/client.py index d40551e..9f2f2e1 100755 --- a/example/py/client.py +++ b/example/py/client.py @@ -15,7 +15,13 @@ def main(): completed_job_request = client.submit_job("ToUpper", "arbitrary binary data") check_request_status(completed_job_request) + completed_job_request = client.submit_job("ToUpperTimeOut5", "arbitrary binary data") + check_request_status(completed_job_request) + + completed_job_request = client.submit_job("ToUpperTimeOut20", "arbitrary binary data") + check_request_status(completed_job_request) + + if __name__ == '__main__': - for i in range(2): - main() + main() diff --git a/example/worker.go b/example/worker.go index a747584..6e06ca4 100644 --- a/example/worker.go +++ b/example/worker.go @@ -3,18 +3,29 @@ package main import ( "os" "log" + "time" "strings" "bitbucket.org/mikespook/golib/signal" "bitbucket.org/mikespook/gearman-go/worker" ) func ToUpper(job *worker.Job) ([]byte, error) { - log.Printf("ToUpper -- Handle=[%s]; UID=[%s], Data=[%s]\n", + log.Printf("ToUpper: Handle=[%s]; UID=[%s], Data=[%s]\n", job.Handle, job.UniqueId, job.Data) data := []byte(strings.ToUpper(string(job.Data))) return data, nil } +func ToUpperDelay10(job *worker.Job) ([]byte, error) { + log.Printf("ToUpperDelay10: Handle=[%s]; UID=[%s], Data=[%s]\n", + job.Handle, job.UniqueId, job.Data) + time.Sleep(10 * time.Second) + data := []byte(strings.ToUpper(string(job.Data))) + return data, nil +} + + + func main() { log.Println("Starting ...") defer log.Println("Shutdown complete!") @@ -39,7 +50,8 @@ func main() { } w.AddServer("127.0.0.1:4730") w.AddFunc("ToUpper", ToUpper, 0) - //w.AddFunc("ToUpperTimeOut5", ToUpper, 5) + w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) + w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) go w.Work() sh := signal.NewHandler() sh.Bind(os.Interrupt, func() bool {return true}) From fe5a31a51ed3a4a47c7ec2252fcc71478714595a Mon Sep 17 00:00:00 2001 From: mikespook Date: Sun, 2 Sep 2012 22:42:54 +0800 Subject: [PATCH 6/9] fixed resources leaking --HG-- branch : dev --- README.md | 3 ++- example/py/client.py | 22 +++++++++++----- example/worker.go | 2 +- worker/job.go | 8 +++++- worker/worker.go | 62 +++++++++++++++++++++++--------------------- 5 files changed, 59 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 1f24a68..8053be6 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ Install both: log.Println(e) } w.AddServer("127.0.0.1:4730") - w.AddFunc("ToUpper", ToUpper, 0) + w.AddFunc("ToUpper", ToUpper, worker.Immediately) w.AddFunc("ToUpperTimeOut5", ToUpper, 5) w.Work() @@ -63,6 +63,7 @@ Xing Xing # History + * 0.1.2 Fixed issues: timeout executing, resources leaking. * 0.1.1 Fixed the issue of grabbing jobs. * 0.1 Code refactoring; Redesign the API. * 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API. diff --git a/example/py/client.py b/example/py/client.py index 9f2f2e1..1cb3777 100755 --- a/example/py/client.py +++ b/example/py/client.py @@ -12,16 +12,26 @@ def check_request_status(job_request): def main(): client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730']) - completed_job_request = client.submit_job("ToUpper", "arbitrary binary data") - check_request_status(completed_job_request) + try: + completed_job_request = client.submit_job("ToUpper", "arbitrary binary data") + check_request_status(completed_job_request) + except Exception as e: + print type(e) - completed_job_request = client.submit_job("ToUpperTimeOut5", "arbitrary binary data") - check_request_status(completed_job_request) - completed_job_request = client.submit_job("ToUpperTimeOut20", "arbitrary binary data") - check_request_status(completed_job_request) + try: + completed_job_request = client.submit_job("ToUpperTimeOut5", "arbitrary binary data") + check_request_status(completed_job_request) + except Exception as e: + print type(e) + try: + completed_job_request = client.submit_job("ToUpperTimeOut20", "arbitrary binary data") + check_request_status(completed_job_request) + except Exception as e: + print type(e) + if __name__ == '__main__': main() diff --git a/example/worker.go b/example/worker.go index 6e06ca4..3975cac 100644 --- a/example/worker.go +++ b/example/worker.go @@ -49,7 +49,7 @@ func main() { return nil } w.AddServer("127.0.0.1:4730") - w.AddFunc("ToUpper", ToUpper, 0) + w.AddFunc("ToUpper", ToUpper, worker.Immediately) w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) go w.Work() diff --git a/worker/job.go b/worker/job.go index e3cebe0..278216a 100644 --- a/worker/job.go +++ b/worker/job.go @@ -91,12 +91,18 @@ func (job *Job) UpdateStatus(numerator, denominator int) { job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) } +// close the job +func (job *Job) Close() { + close(job.c) +} + // cancel the job executing func (job *Job) cancel() { + defer func() {recover()}() job.c <- true } // When a job was canceled, return a true form a channel -func (job *Job) Canceled() chan bool { +func (job *Job) Canceled() <-chan bool { return job.c } diff --git a/worker/worker.go b/worker/worker.go index b248aaa..27f92a9 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -13,6 +13,8 @@ import ( const ( Unlimited = 0 OneByOne = 1 + + Immediately = 0 ) var ( @@ -21,7 +23,7 @@ var ( // Job handler type JobHandler func(*Job) error -type JobFunc func(job *Job) ([]byte, error) +type JobFunc func(*Job) ([]byte, error) // The definition of the callback function. type jobFunc struct { @@ -179,21 +181,20 @@ func (worker *Worker) Work() { var job *Job for ok { if job, ok = <-worker.in; ok { - switch job.DataType { - case common.ERROR: - go func() { + go func() { + defer job.Close() + switch job.DataType { + case common.ERROR: _, err := common.GetError(job.Data) worker.err(err) - }() - case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: - go func() { + case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: if err := worker.exec(job); err != nil { worker.err(err) } - }() - default: - go worker.handleJob(job) - } + default: + worker.handleJob(job) + } + }() } } } @@ -272,26 +273,12 @@ func (worker *Worker) exec(job *Job) (err error) { if !ok { return common.Errorf("The function does not exist: %s", funcname) } - var r result + var r *result if f.timeout == 0 { - r.data, r.err = f.f(job) + d, e := f.f(job) + r = &result{data:d, err: e} } else { - rslt := make(chan *result) - defer close(rslt) - go func() { - defer func() {recover()}() - var r result - r.data, r.err = f.f(job) - rslt <- &r - }() - select { - case re := <-rslt: - r.data = re.data - r.err = re.err - case <-time.After(time.Duration(f.timeout) * time.Second): - r.err = common.ErrExecTimeOut - job.cancel() - } + r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second) } var datatype uint32 if r.err == nil { @@ -327,3 +314,20 @@ type result struct { data []byte err error } + +func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) { + rslt := make(chan *result) + defer close(rslt) + go func() { + defer func() {recover()}() + d, e := f(job) + rslt <- &result{data: d, err: e} + }() + select { + case r = <-rslt: + case <-time.After(timeout): + go job.cancel() + return &result{err:common.ErrExecTimeOut} + } + return r +} From f74dc20e7ece63713495dc75c3e53c4d4d22b7e4 Mon Sep 17 00:00:00 2001 From: mikespook Date: Sun, 2 Sep 2012 22:43:20 +0800 Subject: [PATCH 7/9] Added tag 0.1.2 for changeset 67f11fa2301f --HG-- branch : dev --- .hgtags | 1 + 1 file changed, 1 insertion(+) diff --git a/.hgtags b/.hgtags index 4754237..0fa7935 100644 --- a/.hgtags +++ b/.hgtags @@ -27,3 +27,4 @@ dee83cac69e07ed4f3efde162d981f5855101845 go1-0.0.1 0000000000000000000000000000000000000000 0.1.1 0000000000000000000000000000000000000000 0.1.1 eea0878b43d209630d7b342b1b99c61c839b454f 0.1.1 +67f11fa2301f5e74a436883f66ce7fb121a4df82 0.1.2 From 8c67ce08306fee9c0ffb0bd701be51a7f57b20cb Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 25 Sep 2012 14:39:50 +0800 Subject: [PATCH 8/9] WORK_FAIL fixed --HG-- branch : dev --- worker/job.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/worker/job.go b/worker/job.go index 278216a..ccbb48c 100644 --- a/worker/job.go +++ b/worker/job.go @@ -45,9 +45,14 @@ func decodeJob(data []byte) (job *Job, err error) { // Encode a job to byte slice func (job *Job) Encode() (data []byte) { - l := len(job.Data) - if job.Handle != "" { - l += len(job.Handle) + 1 + var l int + if job.DataType == common.WORK_FAIL { + l = len(job.Handle) + } else { + l = len(job.Data) + if job.Handle != "" { + l += len(job.Handle) + 1 + } } data = make([]byte, 0, l + 12) @@ -60,7 +65,9 @@ func (job *Job) Encode() (data []byte) { data = append(data, datalength[:]...) if job.Handle != "" { data = append(data, []byte(job.Handle)...) - data = append(data, 0) + if job.DataType != common.WORK_FAIL { + data = append(data, 0) + } } data = append(data, job.Data...) return From 05590bfb8caebf1bf072503d30d4956540f77888 Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 25 Sep 2012 15:16:17 +0800 Subject: [PATCH 9/9] Info funcs added --HG-- branch : dev --- example/py/client.py | 12 ++++++++++++ example/worker.go | 2 ++ worker/func.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+) create mode 100644 worker/func.go diff --git a/example/py/client.py b/example/py/client.py index 1cb3777..94b6b07 100755 --- a/example/py/client.py +++ b/example/py/client.py @@ -32,6 +32,18 @@ def main(): except Exception as e: print type(e) + try: + completed_job_request = client.submit_job("SysInfo", "") + check_request_status(completed_job_request) + except Exception as e: + print type(e) + + try: + completed_job_request = client.submit_job("MemInfo", "") + check_request_status(completed_job_request) + except Exception as e: + print type(e) + if __name__ == '__main__': main() diff --git a/example/worker.go b/example/worker.go index 3975cac..0426727 100644 --- a/example/worker.go +++ b/example/worker.go @@ -52,6 +52,8 @@ func main() { w.AddFunc("ToUpper", ToUpper, worker.Immediately) w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) + w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately) + w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately) go w.Work() sh := signal.NewHandler() sh.Bind(os.Interrupt, func() bool {return true}) diff --git a/worker/func.go b/worker/func.go new file mode 100644 index 0000000..374b7dd --- /dev/null +++ b/worker/func.go @@ -0,0 +1,31 @@ +package worker + +import ( + "runtime" + "encoding/json" +) + +type systemInfo struct { + GOOS, GOARCH, GOROOT, Version string + NumCPU, NumGoroutine int + NumCgoCall int64 +} + +func SysInfo(job *Job) ([]byte, error) { + return json.Marshal(&systemInfo{ + GOOS: runtime.GOOS, + GOARCH: runtime.GOARCH, + GOROOT: runtime.GOROOT(), + Version: runtime.Version(), + NumCPU: runtime.NumCPU(), + NumGoroutine: runtime.NumGoroutine(), + NumCgoCall: runtime.NumCgoCall(), + }) +} + +var memState runtime.MemStats + +func MemInfo(job *Job) ([]byte, error) { + runtime.ReadMemStats(&memState) + return json.Marshal(&memState) +}