--HG-- rename : src/pkg/gearman/job.go => src/pkg/gearman/worker/job.go rename : src/pkg/gearman/jobclient.go => src/pkg/gearman/worker/jobclient.gotags/0.0.1
@@ -7,8 +7,8 @@ include $(GOROOT)/src/Make.inc | |||||
TARG=gearman | TARG=gearman | ||||
GOFILES=\ | GOFILES=\ | ||||
gearman.go\ | gearman.go\ | ||||
job.go\ | |||||
jobclient.go\ | |||||
worker/job.go\ | |||||
worker/jobclient.go\ | |||||
worker.go\ | worker.go\ | ||||
# client.go\ | # client.go\ | ||||
@@ -4,6 +4,34 @@ const ( | |||||
TCP = "tcp4" | TCP = "tcp4" | ||||
WORKER_SERVER_CAP = 32 | WORKER_SERVER_CAP = 32 | ||||
WORKER_FUNCTION_CAP = 512 | WORKER_FUNCTION_CAP = 512 | ||||
// \x00REQ | |||||
REQ = 5391697 | |||||
// \x00RES | |||||
RES = 5391699 | |||||
CAN_DO = 1 | |||||
CANT_DO = 2 | |||||
RESET_ABILITIES = 3 | |||||
PRE_SLEEP = 4 | |||||
NOOP = 6 | |||||
GRAB_JOB = 9 | |||||
NO_JOB = 10 | |||||
JOB_ASSIGN = 11 | |||||
WORK_STATUS = 12 | |||||
WORK_COMPLETE = 13 | |||||
WORK_FAIL = 14 | |||||
ECHO_REQ = 16 | |||||
ECHO_RES = 17 | |||||
ERROR = 19 | |||||
SET_CLIENT_ID = 22 | |||||
CAN_DO_TIMEOUT = 23 | |||||
WORK_EXCEPTION = 25 | |||||
WORK_DATA = 28 | |||||
WORK_WARNING = 29 | |||||
GRAB_JOB_UNIQ = 30 | |||||
JOB_ASSIGN_UNIQ = 31 | |||||
) | ) | ||||
@@ -2,6 +2,7 @@ package gearman | |||||
import ( | import ( | ||||
"testing" | "testing" | ||||
"os" | |||||
) | ) | ||||
var worker *Worker | var worker *Worker | ||||
@@ -16,21 +17,22 @@ func TestAddServer(t *testing.T) { | |||||
t.Error(err) | t.Error(err) | ||||
} | } | ||||
if l := len(worker.servers); l != 1 { | |||||
t.Log(worker.servers) | |||||
if l := len(worker.clients); l != 1 { | |||||
t.Log(worker.clients) | |||||
t.Error("The length of server list should be 1.") | t.Error("The length of server list should be 1.") | ||||
} | } | ||||
} | } | ||||
func TestAddFunction(t *testing.T) { | |||||
f := func(job *Job) []byte { | |||||
return nil | |||||
} | |||||
func foobar(job *Job) ([]byte, os.Error) { | |||||
return nil, nil | |||||
} | |||||
if err := worker.AddFunction("foobar", f); err != nil { | |||||
func TestAddFunction(t *testing.T) { | |||||
if err := worker.AddFunction("foobar", foobar, 0); err != nil { | |||||
t.Error(err) | t.Error(err) | ||||
} | } | ||||
if err := worker.AddFunction("timeout", f); err != nil { | |||||
if err := worker.AddFunction("timeout", foobar, 5); err != nil { | |||||
t.Error(err) | t.Error(err) | ||||
} | } | ||||
if l := len(worker.functions); l != 2 { | if l := len(worker.functions); l != 2 { | ||||
@@ -40,19 +42,31 @@ func TestAddFunction(t *testing.T) { | |||||
} | } | ||||
func TestEcho(t * testing.T) { | func TestEcho(t * testing.T) { | ||||
go worker.Work() | |||||
if err := worker.Echo([]byte("Hello World")); err != nil { | if err := worker.Echo([]byte("Hello World")); err != nil { | ||||
t.Error(err) | t.Error(err) | ||||
} | } | ||||
} | } | ||||
/* | |||||
func TestResult(t *testing.T) { | func TestResult(t *testing.T) { | ||||
if job := worker.Result(); job == nil { | if job := worker.Result(); job == nil { | ||||
//t.Error("Nothing in result.") | |||||
t.Error("Nothing in result.") | |||||
} else { | } else { | ||||
t.Log(job) | t.Log(job) | ||||
} | } | ||||
} | } | ||||
*/ | |||||
func TestRemoveFunction(t * testing.T) { | |||||
if err := worker.RemoveFunction("foobar"); err != nil { | |||||
t.Error(err) | |||||
} | |||||
} | |||||
func TestReset(t * testing.T) { | |||||
if err := worker.Reset(); err != nil { | |||||
t.Error(err) | |||||
} | |||||
} | |||||
func TestClose(t *testing.T) { | func TestClose(t *testing.T) { | ||||
if err := worker.Close(); err != nil { | if err := worker.Close(); err != nil { | ||||
@@ -1,79 +0,0 @@ | |||||
package gearman | |||||
import ( | |||||
"os" | |||||
) | |||||
const ( | |||||
// \x00REQ | |||||
REQ = 5391697 | |||||
// \x00RES | |||||
RES = 5391699 | |||||
CAN_DO = 1 | |||||
CANT_DO = 2 | |||||
ECHO_REQ = 16 | |||||
ECHO_RES = 17 | |||||
ERROR = 19 | |||||
CAN_DO_TIMEOUT = 23 | |||||
) | |||||
type Job struct { | |||||
client *JobClient | |||||
Data []byte | |||||
MagicCode, DataType uint32 | |||||
} | |||||
func ByteToUint32(buf [4]byte) uint32 { | |||||
return uint32(buf[0]) << 24 + | |||||
uint32(buf[1]) << 16 + | |||||
uint32(buf[2]) << 8 + | |||||
uint32(buf[3]) | |||||
} | |||||
func Uint32ToByte(i uint32) (data [4]byte) { | |||||
data[0] = byte((i >> 24) & 0xff) | |||||
data[1] = byte((i >> 16) & 0xff) | |||||
data[2] = byte((i >> 8) & 0xff) | |||||
data[3] = byte(i & 0xff) | |||||
return | |||||
} | |||||
func NewJob(server *JobClient, magiccode, datatype uint32, data []byte) (job *Job) { | |||||
return &Job{client: server, | |||||
MagicCode:magiccode, | |||||
DataType: datatype, | |||||
Data:data} | |||||
} | |||||
func DecodeJob(server *JobClient, data []byte) (job *Job, err os.Error) { | |||||
if len(data) < 12 { | |||||
return nil, os.NewError("Data length is too small.") | |||||
} | |||||
datatype := ByteToUint32([4]byte{data[4], data[5], data[6], data[7]}) | |||||
l := ByteToUint32([4]byte{data[8], data[9], data[10], data[11]}) | |||||
if len(data[12:]) != int(l) { | |||||
return nil, os.NewError("Invalid data length.") | |||||
} | |||||
switch(ByteToUint32([4]byte{data[4], data[5], data[6], data[7]})) { | |||||
case ECHO_RES: | |||||
data = data[12:] | |||||
} | |||||
return NewJob(server, REQ, datatype, data), err | |||||
} | |||||
func (job *Job) Encode() (data []byte) { | |||||
magiccode := Uint32ToByte(job.MagicCode) | |||||
datatype := Uint32ToByte(job.DataType) | |||||
l := len(job.Data) | |||||
datalength := Uint32ToByte(uint32(l)) | |||||
data = make([]byte, 12 + l) | |||||
copy(data[:4], magiccode[:]) | |||||
copy(data[4:8], datatype[:]) | |||||
copy(data[8:12], datalength[:]) | |||||
copy(data[12:], job.Data) | |||||
return | |||||
} | |||||
@@ -1,68 +0,0 @@ | |||||
package gearman | |||||
import ( | |||||
"net" | |||||
"os" | |||||
// "log" | |||||
) | |||||
type JobClient struct { | |||||
conn net.Conn | |||||
incoming chan *Job | |||||
running bool | |||||
} | |||||
func NewJobClient(addr string, incoming chan *Job) (jobclient *JobClient, err os.Error) { | |||||
conn, err := net.Dial(TCP, addr) | |||||
if err != nil { | |||||
return nil, err | |||||
} | |||||
jobclient = &JobClient{conn: conn, incoming: incoming, running:true} | |||||
return jobclient, err | |||||
} | |||||
func (server *JobClient) Work() (err os.Error) { | |||||
for server.running { | |||||
var rel []byte | |||||
for { | |||||
buf := make([]byte, 2048) | |||||
n, err := server.conn.Read(buf) | |||||
if err != nil { | |||||
if err == os.EOF && n == 0 { | |||||
break | |||||
} | |||||
return err | |||||
} | |||||
rel = append(rel, buf[0: n] ...) | |||||
break | |||||
} | |||||
job, err := DecodeJob(server, rel) | |||||
if err != nil { | |||||
return err | |||||
} else { | |||||
server.incoming <- job | |||||
} | |||||
} | |||||
return | |||||
} | |||||
func (server *JobClient) WriteJob(job * Job) (err os.Error) { | |||||
return server.Write(job.Encode()) | |||||
} | |||||
func (server *JobClient) Write(buf []byte) (err os.Error) { | |||||
var n int | |||||
for i := 0; i < len(buf); i += n { | |||||
n, err = server.conn.Write(buf[i:]) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
return | |||||
} | |||||
func (server *JobClient) Close() (err os.Error) { | |||||
server.running = false | |||||
err = server.conn.Close() | |||||
return | |||||
} |
@@ -6,24 +6,29 @@ import( | |||||
"log" | "log" | ||||
) | ) | ||||
type JobFunction func(job *Job) []byte | |||||
type JobFunction func(job *Job) ([]byte, os.Error) | |||||
type JobFunctionMap map[string]JobFunction | |||||
type Worker struct { | type Worker struct { | ||||
servers []*JobClient | |||||
functions map[string]JobFunction | |||||
clients []*jobClient | |||||
functions JobFunctionMap | |||||
running bool | running bool | ||||
incoming chan *Job | incoming chan *Job | ||||
mutex sync.Mutex | mutex sync.Mutex | ||||
queue chan *Job | |||||
Queue chan *Job | |||||
} | } | ||||
func NewWorker() (worker *Worker) { | func NewWorker() (worker *Worker) { | ||||
worker = &Worker{servers:make([]*JobClient, 0, WORKER_SERVER_CAP), | |||||
functions: make(map[string]JobFunction), | |||||
worker = &Worker{ | |||||
// job server list | |||||
clients:make([]*jobClient, 0, WORKER_SERVER_CAP), | |||||
// function list | |||||
functions: make(JobFunctionMap), | |||||
incoming: make(chan *Job, 512), | incoming: make(chan *Job, 512), | ||||
queue: make(chan *Job, 512), | |||||
running: true,} | |||||
Queue: make(chan *Job, 512), | |||||
running: true, | |||||
} | |||||
return worker | return worker | ||||
} | } | ||||
@@ -33,43 +38,69 @@ func (worker * Worker) AddServer(addr string) (err os.Error) { | |||||
worker.mutex.Lock() | worker.mutex.Lock() | ||||
defer worker.mutex.Unlock() | defer worker.mutex.Unlock() | ||||
if len(worker.servers) == cap(worker.servers) { | |||||
return os.NewError("There were too many servers.") | |||||
if len(worker.clients) == cap(worker.clients) { | |||||
return os.NewError("There were too many clients.") | |||||
} | } | ||||
// Create a new job server's client as a agent of server | // Create a new job server's client as a agent of server | ||||
server, err := NewJobClient(addr, worker.incoming) | |||||
server, err := newJobClient(addr, worker.incoming) | |||||
if err != nil { | if err != nil { | ||||
return err | return err | ||||
} | } | ||||
n := len(worker.servers) | |||||
worker.servers = worker.servers[0: n + 1] | |||||
worker.servers[n] = server | |||||
n := len(worker.clients) | |||||
worker.clients = worker.clients[0: n + 1] | |||||
worker.clients[n] = server | |||||
return | return | ||||
} | } | ||||
// add function | // add function | ||||
func (worker * Worker) AddFunction(funcname string, | func (worker * Worker) AddFunction(funcname string, | ||||
f JobFunction) (err os.Error) { | |||||
worker.mutex.Lock() | |||||
defer worker.mutex.Unlock() | |||||
f JobFunction, timeout uint32) (err os.Error) { | |||||
if f == nil { | if f == nil { | ||||
return os.NewError("Job function should not be nil.") | return os.NewError("Job function should not be nil.") | ||||
} | } | ||||
if len(worker.clients) < 1 { | |||||
return os.NewError("Did not connect to Job Server.") | |||||
} | |||||
worker.mutex.Lock() | |||||
defer worker.mutex.Unlock() | |||||
worker.functions[funcname] = f | worker.functions[funcname] = f | ||||
var datatype uint32 | |||||
var data []byte | |||||
if timeout == 0 { | |||||
datatype = CAN_DO | |||||
data = []byte(funcname) | |||||
} else { | |||||
datatype = CAN_DO_TIMEOUT | |||||
data = []byte(funcname + "\x00") | |||||
t := uint32ToByte(timeout) | |||||
data = append(data, t[:] ...) | |||||
} | |||||
job := NewJob(REQ, datatype, data) | |||||
worker.WriteJob(job) | |||||
return | return | ||||
} | } | ||||
// work | |||||
func (worker * Worker) Work() { | |||||
for k, _ := range worker.functions { | |||||
job := NewJob(nil, REQ, CAN_DO, []byte(k)) | |||||
worker.Write(job) | |||||
// remove function | |||||
func (worker * Worker) RemoveFunction(funcname string) (err os.Error) { | |||||
worker.mutex.Lock() | |||||
defer worker.mutex.Unlock() | |||||
if worker.functions[funcname] == nil { | |||||
return os.NewError("No function named: " + funcname) | |||||
} | } | ||||
worker.functions[funcname] = nil, false | |||||
job := NewJob(REQ, CANT_DO, []byte(funcname)) | |||||
worker.WriteJob(job) | |||||
return | |||||
} | |||||
for _, v := range worker.servers { | |||||
// work | |||||
func (worker * Worker) Work() { | |||||
for _, v := range worker.clients { | |||||
go v.Work() | go v.Work() | ||||
} | } | ||||
for worker.running { | for worker.running { | ||||
@@ -78,47 +109,51 @@ func (worker * Worker) Work() { | |||||
if job == nil { | if job == nil { | ||||
break | break | ||||
} | } | ||||
switch job.DataType { | |||||
switch job.dataType { | |||||
case NO_JOB: | |||||
// do nothing | |||||
log.Println(job) | |||||
case ERROR: | case ERROR: | ||||
log.Panicln(string(job.Data)) | |||||
default: | |||||
if err := worker.Exec(job); err != nil { | |||||
log.Println(string(job.Data)) | |||||
case JOB_ASSIGN, JOB_ASSIGN_UNIQ: | |||||
if err := worker.exec(job); err != nil { | |||||
log.Panicln(err) | log.Panicln(err) | ||||
} | } | ||||
continue | |||||
default: | |||||
worker.Queue <- job | |||||
} | } | ||||
worker.queue <- job | |||||
} | } | ||||
} | } | ||||
} | } | ||||
func (worker * Worker) Result() (job *Job) { | func (worker * Worker) Result() (job *Job) { | ||||
if l := len(worker.queue); l != 1 { | |||||
if l := len(worker.Queue); l != 1 { | |||||
if l == 0 { | if l == 0 { | ||||
return | return | ||||
} | } | ||||
for i := 0; i < l - 1; i ++ { | for i := 0; i < l - 1; i ++ { | ||||
<-worker.queue | |||||
<-worker.Queue | |||||
} | } | ||||
} | } | ||||
return <-worker.queue | |||||
return <-worker.Queue | |||||
} | } | ||||
// Close | // Close | ||||
// should used as defer | // should used as defer | ||||
func (worker * Worker) Close() (err os.Error){ | func (worker * Worker) Close() (err os.Error){ | ||||
worker.running = false | worker.running = false | ||||
for _, v := range worker.servers { | |||||
for _, v := range worker.clients { | |||||
err = v.Close() | err = v.Close() | ||||
} | } | ||||
close(worker.incoming) | close(worker.incoming) | ||||
return err | return err | ||||
} | } | ||||
func (worker * Worker) Write(job *Job) (err os.Error) { | |||||
func (worker * Worker) WriteJob(job *Job) (err os.Error) { | |||||
e := make(chan os.Error) | e := make(chan os.Error) | ||||
for _, v := range worker.servers { | |||||
for _, v := range worker.clients { | |||||
go func() { | go func() { | ||||
job.client = v | |||||
e <- v.WriteJob(job) | e <- v.WriteJob(job) | ||||
}() | }() | ||||
} | } | ||||
@@ -127,11 +162,61 @@ func (worker * Worker) Write(job *Job) (err os.Error) { | |||||
// Echo | // Echo | ||||
func (worker * Worker) Echo(data []byte) (err os.Error) { | func (worker * Worker) Echo(data []byte) (err os.Error) { | ||||
job := NewJob(nil, REQ, ECHO_REQ, data) | |||||
return worker.Write(job) | |||||
job := NewJob(REQ, ECHO_REQ, data) | |||||
return worker.WriteJob(job) | |||||
} | |||||
// Reset | |||||
func (worker * Worker) Reset() (err os.Error){ | |||||
job := NewJob(REQ, RESET_ABILITIES, nil) | |||||
return worker.WriteJob(job) | |||||
} | |||||
// SetId | |||||
func (worker * Worker) SetId(id string) (err os.Error) { | |||||
job := NewJob(REQ, SET_CLIENT_ID, []byte(id)) | |||||
return worker.WriteJob(job) | |||||
} | } | ||||
// Exec | // Exec | ||||
func (worker * Worker) Exec(job *Job) (err os.Error) { | |||||
func (worker * Worker) exec(job *Job) (err os.Error) { | |||||
jobdata := splitByteArray(job.Data, '\x00') | |||||
job.Handle = string(jobdata[0]) | |||||
funcname := string(jobdata[1]) | |||||
if job.dataType == JOB_ASSIGN { | |||||
job.Data = jobdata[2] | |||||
} else { | |||||
job.UniqueId = string(jobdata[2]) | |||||
job.Data = jobdata[3] | |||||
} | |||||
result, err := worker.functions[funcname](job) | |||||
var datatype uint32 | |||||
if err == nil { | |||||
datatype = WORK_COMPLETE | |||||
} else{ | |||||
if result == nil { | |||||
datatype = WORK_FAIL | |||||
} else { | |||||
datatype = WORK_EXCEPTION | |||||
} | |||||
} | |||||
worker.WriteJob(NewJob(REQ, datatype, result)) | |||||
return | |||||
} | |||||
func splitByteArray(slice []byte, spot byte) (data [][]byte){ | |||||
data = make([][]byte, 0, 10) | |||||
log.Println(data) | |||||
start, end := 0, 0 | |||||
for i, v := range slice { | |||||
if v == spot { | |||||
if start != end { | |||||
data = append(data, slice[start:end]) | |||||
} | |||||
start, end = i, i | |||||
} else { | |||||
end ++ | |||||
} | |||||
} | |||||
return | return | ||||
} | } |
@@ -0,0 +1,88 @@ | |||||
package gearman | |||||
import ( | |||||
"os" | |||||
) | |||||
type Job struct { | |||||
Data []byte | |||||
Handle string | |||||
UniqueId string | |||||
client *jobClient | |||||
magicCode, dataType uint32 | |||||
} | |||||
func byteToUint32(buf [4]byte) uint32 { | |||||
return uint32(buf[0]) << 24 + | |||||
uint32(buf[1]) << 16 + | |||||
uint32(buf[2]) << 8 + | |||||
uint32(buf[3]) | |||||
} | |||||
func uint32ToByte(i uint32) (data [4]byte) { | |||||
data[0] = byte((i >> 24) & 0xff) | |||||
data[1] = byte((i >> 16) & 0xff) | |||||
data[2] = byte((i >> 8) & 0xff) | |||||
data[3] = byte(i & 0xff) | |||||
return | |||||
} | |||||
func NewJob(magiccode, datatype uint32, data []byte) (job *Job) { | |||||
return &Job{magicCode:magiccode, | |||||
dataType: datatype, | |||||
Data:data} | |||||
} | |||||
func DecodeJob(data []byte) (job *Job, err os.Error) { | |||||
if len(data) < 12 { | |||||
return nil, os.NewError("Data length is too small.") | |||||
} | |||||
datatype := byteToUint32([4]byte{data[4], data[5], data[6], data[7]}) | |||||
l := byteToUint32([4]byte{data[8], data[9], data[10], data[11]}) | |||||
if len(data[12:]) != int(l) { | |||||
return nil, os.NewError("Invalid data length.") | |||||
} | |||||
switch(byteToUint32([4]byte{data[4], data[5], data[6], data[7]})) { | |||||
case ECHO_RES: | |||||
data = data[12:] | |||||
} | |||||
return NewJob(REQ, datatype, data), err | |||||
} | |||||
func (job *Job) Encode() (data []byte) { | |||||
magiccode := uint32ToByte(job.magicCode) | |||||
datatype := uint32ToByte(job.dataType) | |||||
l := len(job.Data) | |||||
datalength := uint32ToByte(uint32(l)) | |||||
data = make([]byte, 12 + l) | |||||
copy(data[:4], magiccode[:]) | |||||
copy(data[4:8], datatype[:]) | |||||
copy(data[8:12], datalength[:]) | |||||
copy(data[12:], job.Data) | |||||
return | |||||
} | |||||
// update data | |||||
func (job * Job) UpdateData(data []byte, iswaring bool) (err os.Error) { | |||||
result := append([]byte(job.Handle), 0) | |||||
result = append(result, data ...) | |||||
var datatype uint32 | |||||
if iswaring { | |||||
datatype = WORK_WARNING | |||||
} else { | |||||
datatype = WORK_DATA | |||||
} | |||||
return job.client.WriteJob(NewJob(REQ, datatype, result)) | |||||
} | |||||
// update status | |||||
func (job * Job) UpdateStatus(numerator, denominator uint32) (err os.Error) { | |||||
n := uint32ToByte(numerator) | |||||
d := uint32ToByte(denominator) | |||||
result := append([]byte(job.Handle), 0) | |||||
result = append(result, n[:] ...) | |||||
result = append(result, d[:] ...) | |||||
return job.client.WriteJob(NewJob(REQ, WORK_STATUS, result)) | |||||
} | |||||
@@ -0,0 +1,82 @@ | |||||
package gearman | |||||
import ( | |||||
"net" | |||||
"os" | |||||
// "log" | |||||
) | |||||
type jobClient struct { | |||||
conn net.Conn | |||||
incoming chan *Job | |||||
running bool | |||||
} | |||||
func newJobClient(addr string, incoming chan *Job) (jobclient *jobClient, err os.Error) { | |||||
conn, err := net.Dial(TCP, addr) | |||||
if err != nil { | |||||
return nil, err | |||||
} | |||||
jobclient = &jobClient{conn:conn, incoming: incoming, running:true} | |||||
return jobclient, err | |||||
} | |||||
func (client *jobClient) Work() (err os.Error) { | |||||
noop := true | |||||
for client.running { | |||||
// grab job | |||||
if noop { | |||||
client.WriteJob(NewJob(REQ, GRAB_JOB, nil)) | |||||
} | |||||
var rel []byte | |||||
for { | |||||
buf := make([]byte, 2048) | |||||
n, err := client.conn.Read(buf) | |||||
if err != nil { | |||||
if err == os.EOF && n == 0 { | |||||
break | |||||
} | |||||
return err | |||||
} | |||||
rel = append(rel, buf[0: n] ...) | |||||
break | |||||
} | |||||
job, err := DecodeJob(rel) | |||||
if err != nil { | |||||
return err | |||||
} else { | |||||
switch(job.dataType) { | |||||
case NOOP: | |||||
noop = true | |||||
case NO_JOB: | |||||
noop = false | |||||
client.WriteJob(NewJob(REQ, PRE_SLEEP, nil)) | |||||
case ECHO_RES, JOB_ASSIGN_UNIQ, JOB_ASSIGN: | |||||
job.client = client | |||||
client.incoming <- job | |||||
} | |||||
} | |||||
} | |||||
return | |||||
} | |||||
func (client *jobClient) WriteJob(job * Job) (err os.Error) { | |||||
return client.Write(job.Encode()) | |||||
} | |||||
func (client *jobClient) Write(buf []byte) (err os.Error) { | |||||
var n int | |||||
for i := 0; i < len(buf); i += n { | |||||
n, err = client.conn.Write(buf[i:]) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
} | |||||
return | |||||
} | |||||
func (client *jobClient) Close() (err os.Error) { | |||||
client.running = false | |||||
err = client.conn.Close() | |||||
return | |||||
} |
@@ -1,17 +0,0 @@ | |||||
# Copyright 2009 The Go Authors. All rights reserved. | |||||
# Use of this source code is governed by a BSD-style | |||||
# license that can be found in the LICENSE file. | |||||
include $(GOROOT)/src/Make.inc | |||||
TARG=client | |||||
GOFILES=\ | |||||
client.go\ | |||||
CLEANFILES+=client | |||||
include $(GOROOT)/src/Make.pkg | |||||
%: install %.go | |||||
$(GC) $*.go | |||||
$(LD) -o $@ $*.$O |
@@ -1,74 +0,0 @@ | |||||
package main | |||||
import ( | |||||
"net" | |||||
"log" | |||||
"fmt" | |||||
) | |||||
type Client struct { | |||||
c * net.TCPConn | |||||
w chan []byte | |||||
r chan []byte | |||||
} | |||||
func NewClient() * Client { | |||||
client := new(Client) | |||||
wch := make(chan []byte, 2048) | |||||
rch := make(chan []byte, 2048) | |||||
client.w = wch | |||||
client.r = rch | |||||
addr, err := net.ResolveTCPAddr("127.0.0.1:4730") | |||||
if err != nil { | |||||
log.Panic(err.String()) | |||||
} | |||||
c, err := net.DialTCP("tcp", nil, addr) | |||||
client.c = c | |||||
if err != nil { | |||||
log.Panic(err.String()) | |||||
} | |||||
go client.ReadLoop() | |||||
go client.WriteLoop() | |||||
return client | |||||
} | |||||
func (client * Client) ReadLoop() { | |||||
for { | |||||
buf := make([]byte, 2048) | |||||
_, err := client.c.Read(buf) | |||||
if err != nil { | |||||
client.c.Close() | |||||
break; | |||||
} | |||||
client.r <- buf | |||||
} | |||||
} | |||||
func (client * Client) WriteLoop() { | |||||
for { | |||||
select { | |||||
case buf := <- client.w: | |||||
client.c.Write(buf) | |||||
} | |||||
} | |||||
} | |||||
func (client * Client) Write(data []byte) { | |||||
client.w <- data | |||||
} | |||||
func (client * Client) Read() []byte{ | |||||
var buf []byte | |||||
select { | |||||
case buf = <- client.r: | |||||
} | |||||
return buf | |||||
} | |||||
func main() { | |||||
client := NewClient() | |||||
client.Write([]byte("TEST\r\n")) | |||||
for { | |||||
fmt.Println(client.Read()) | |||||
} | |||||
} |