diff --git a/src/gearman/Makefile b/src/gearman/Makefile index bd6da01..cbead97 100644 --- a/src/gearman/Makefile +++ b/src/gearman/Makefile @@ -8,7 +8,7 @@ TARG=gearman GOFILES=\ gearman.go\ workerjob.go\ - jobclient.go\ + jobagent.go\ worker.go\ clientjob.go\ client.go\ diff --git a/src/gearman/jobclient.go b/src/gearman/jobagent.go similarity index 61% rename from src/gearman/jobclient.go rename to src/gearman/jobagent.go index 53d865e..e3e1cf6 100644 --- a/src/gearman/jobclient.go +++ b/src/gearman/jobagent.go @@ -10,34 +10,34 @@ import ( // "log" ) -// The client of job server. -type jobClient struct { +// The agent of job server. +type jobAgent struct { conn net.Conn worker *Worker running bool incoming chan []byte } -// Create the client of job server. -func newJobClient(addr string, worker *Worker) (jobclient *jobClient, err os.Error) { +// Create the agent of job server. +func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err os.Error) { conn, err := net.Dial(TCP, addr) if err != nil { return nil, err } - jobclient = &jobClient{conn: conn, worker: worker, running: true, incoming: make(chan []byte, QUEUE_CAP)} - return jobclient, err + jobagent = &jobAgent{conn: conn, worker: worker, running: true, incoming: make(chan []byte, QUEUE_CAP)} + return jobagent, err } // Internal read -func (client *jobClient) read() (data []byte, err os.Error) { - if len(client.incoming) > 0 { +func (agent *jobAgent) read() (data []byte, err os.Error) { + if len(agent.incoming) > 0 { // incoming queue is not empty - data = <-client.incoming + data = <-agent.incoming } else { for { buf := make([]byte, BUFFER_SIZE) var n int - if n, err = client.conn.Read(buf); err != nil { + if n, err = agent.conn.Read(buf); err != nil { if err == os.EOF && n == 0 { err = nil return @@ -60,7 +60,7 @@ func (client *jobClient) read() (data []byte, err os.Error) { if total == tl { return } else { - client.incoming <- data[total:] + agent.incoming <- data[total:] data = data[:total] return } @@ -73,21 +73,21 @@ func (client *jobClient) read() (data []byte, err os.Error) { } // Main loop. -func (client *jobClient) Work() { +func (agent *jobAgent) Work() { noop := true - for client.running { + for agent.running { // got noop msg and incoming queue is zero, grab job - if noop && len(client.incoming) == 0 { - client.WriteJob(NewWorkerJob(REQ, GRAB_JOB, nil)) + if noop && len(agent.incoming) == 0 { + agent.WriteJob(NewWorkerJob(REQ, GRAB_JOB, nil)) } - rel, err := client.read() + rel, err := agent.read() if err != nil { - client.worker.ErrQueue <- err + agent.worker.ErrQueue <- err continue } job, err := DecodeWorkerJob(rel) if err != nil { - client.worker.ErrQueue <- err + agent.worker.ErrQueue <- err continue } else { switch job.DataType { @@ -95,10 +95,10 @@ func (client *jobClient) Work() { noop = true case NO_JOB: noop = false - client.WriteJob(NewWorkerJob(REQ, PRE_SLEEP, nil)) + agent.WriteJob(NewWorkerJob(REQ, PRE_SLEEP, nil)) case ECHO_RES, JOB_ASSIGN_UNIQ, JOB_ASSIGN: - job.client = client - client.worker.incoming <- job + job.agent = agent + agent.worker.incoming <- job } } } @@ -106,15 +106,15 @@ func (client *jobClient) Work() { } // Send a job to the job server. -func (client *jobClient) WriteJob(job *WorkerJob) (err os.Error) { - return client.write(job.Encode()) +func (agent *jobAgent) WriteJob(job *WorkerJob) (err os.Error) { + return agent.write(job.Encode()) } // Internal write the encoded job. -func (client *jobClient) write(buf []byte) (err os.Error) { +func (agent *jobAgent) write(buf []byte) (err os.Error) { var n int for i := 0; i < len(buf); i += n { - n, err = client.conn.Write(buf[i:]) + n, err = agent.conn.Write(buf[i:]) if err != nil { return err } @@ -123,9 +123,9 @@ func (client *jobClient) write(buf []byte) (err os.Error) { } // Close. -func (client *jobClient) Close() (err os.Error) { - client.running = false - close(client.incoming) - err = client.conn.Close() +func (agent *jobAgent) Close() (err os.Error) { + agent.running = false + close(agent.incoming) + err = agent.conn.Close() return } diff --git a/src/gearman/worker.go b/src/gearman/worker.go index ec2b8e9..99d7c86 100644 --- a/src/gearman/worker.go +++ b/src/gearman/worker.go @@ -34,7 +34,7 @@ func foobar(job *WorkerJob) (data []byte, err os.Error) { } */ type Worker struct { - clients []*jobClient + clients []*jobAgent functions JobFunctionMap running bool @@ -48,7 +48,7 @@ type Worker struct { func NewWorker() (worker *Worker) { worker = &Worker{ // job server list - clients: make([]*jobClient, 0, WORKER_SERVER_CAP), + clients: make([]*jobAgent, 0, WORKER_SERVER_CAP), // function list functions: make(JobFunctionMap), incoming: make(chan *WorkerJob, QUEUE_CAP), @@ -70,7 +70,7 @@ func (worker *Worker) AddServer(addr string) (err os.Error) { } // Create a new job server's client as a agent of server - server, err := newJobClient(addr, worker) + server, err := newJobAgent(addr, worker) if err != nil { return err } diff --git a/src/gearman/workerjob.go b/src/gearman/workerjob.go index a9b2c2d..c9549bf 100644 --- a/src/gearman/workerjob.go +++ b/src/gearman/workerjob.go @@ -13,7 +13,7 @@ import ( type WorkerJob struct { Data []byte Handle, UniqueId string - client *jobClient + agent *jobAgent magicCode, DataType uint32 } @@ -72,7 +72,7 @@ func (job *WorkerJob) UpdateData(data []byte, iswaring bool) (err os.Error) { } else { datatype = WORK_DATA } - return job.client.WriteJob(NewWorkerJob(REQ, datatype, result)) + return job.agent.WriteJob(NewWorkerJob(REQ, datatype, result)) } // Update status. @@ -83,5 +83,5 @@ func (job *WorkerJob) UpdateStatus(numerator, denominator int) (err os.Error) { result := append([]byte(job.Handle), 0) result = append(result, n...) result = append(result, d...) - return job.client.WriteJob(NewWorkerJob(REQ, WORK_STATUS, result)) + return job.agent.WriteJob(NewWorkerJob(REQ, WORK_STATUS, result)) }