|
|
@@ -10,34 +10,34 @@ import ( |
|
|
|
// "log" |
|
|
|
) |
|
|
|
|
|
|
|
// The client of job server. |
|
|
|
type jobClient struct { |
|
|
|
// The agent of job server. |
|
|
|
type jobAgent struct { |
|
|
|
conn net.Conn |
|
|
|
worker *Worker |
|
|
|
running bool |
|
|
|
incoming chan []byte |
|
|
|
} |
|
|
|
|
|
|
|
// Create the client of job server. |
|
|
|
func newJobClient(addr string, worker *Worker) (jobclient *jobClient, err os.Error) { |
|
|
|
// Create the agent of job server. |
|
|
|
func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err os.Error) { |
|
|
|
conn, err := net.Dial(TCP, addr) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
jobclient = &jobClient{conn: conn, worker: worker, running: true, incoming: make(chan []byte, QUEUE_CAP)} |
|
|
|
return jobclient, err |
|
|
|
jobagent = &jobAgent{conn: conn, worker: worker, running: true, incoming: make(chan []byte, QUEUE_CAP)} |
|
|
|
return jobagent, err |
|
|
|
} |
|
|
|
|
|
|
|
// Internal read |
|
|
|
func (client *jobClient) read() (data []byte, err os.Error) { |
|
|
|
if len(client.incoming) > 0 { |
|
|
|
func (agent *jobAgent) read() (data []byte, err os.Error) { |
|
|
|
if len(agent.incoming) > 0 { |
|
|
|
// incoming queue is not empty |
|
|
|
data = <-client.incoming |
|
|
|
data = <-agent.incoming |
|
|
|
} else { |
|
|
|
for { |
|
|
|
buf := make([]byte, BUFFER_SIZE) |
|
|
|
var n int |
|
|
|
if n, err = client.conn.Read(buf); err != nil { |
|
|
|
if n, err = agent.conn.Read(buf); err != nil { |
|
|
|
if err == os.EOF && n == 0 { |
|
|
|
err = nil |
|
|
|
return |
|
|
@@ -60,7 +60,7 @@ func (client *jobClient) read() (data []byte, err os.Error) { |
|
|
|
if total == tl { |
|
|
|
return |
|
|
|
} else { |
|
|
|
client.incoming <- data[total:] |
|
|
|
agent.incoming <- data[total:] |
|
|
|
data = data[:total] |
|
|
|
return |
|
|
|
} |
|
|
@@ -73,21 +73,21 @@ func (client *jobClient) read() (data []byte, err os.Error) { |
|
|
|
} |
|
|
|
|
|
|
|
// Main loop. |
|
|
|
func (client *jobClient) Work() { |
|
|
|
func (agent *jobAgent) Work() { |
|
|
|
noop := true |
|
|
|
for client.running { |
|
|
|
for agent.running { |
|
|
|
// got noop msg and incoming queue is zero, grab job |
|
|
|
if noop && len(client.incoming) == 0 { |
|
|
|
client.WriteJob(NewWorkerJob(REQ, GRAB_JOB, nil)) |
|
|
|
if noop && len(agent.incoming) == 0 { |
|
|
|
agent.WriteJob(NewWorkerJob(REQ, GRAB_JOB, nil)) |
|
|
|
} |
|
|
|
rel, err := client.read() |
|
|
|
rel, err := agent.read() |
|
|
|
if err != nil { |
|
|
|
client.worker.ErrQueue <- err |
|
|
|
agent.worker.ErrQueue <- err |
|
|
|
continue |
|
|
|
} |
|
|
|
job, err := DecodeWorkerJob(rel) |
|
|
|
if err != nil { |
|
|
|
client.worker.ErrQueue <- err |
|
|
|
agent.worker.ErrQueue <- err |
|
|
|
continue |
|
|
|
} else { |
|
|
|
switch job.DataType { |
|
|
@@ -95,10 +95,10 @@ func (client *jobClient) Work() { |
|
|
|
noop = true |
|
|
|
case NO_JOB: |
|
|
|
noop = false |
|
|
|
client.WriteJob(NewWorkerJob(REQ, PRE_SLEEP, nil)) |
|
|
|
agent.WriteJob(NewWorkerJob(REQ, PRE_SLEEP, nil)) |
|
|
|
case ECHO_RES, JOB_ASSIGN_UNIQ, JOB_ASSIGN: |
|
|
|
job.client = client |
|
|
|
client.worker.incoming <- job |
|
|
|
job.agent = agent |
|
|
|
agent.worker.incoming <- job |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@@ -106,15 +106,15 @@ func (client *jobClient) Work() { |
|
|
|
} |
|
|
|
|
|
|
|
// Send a job to the job server. |
|
|
|
func (client *jobClient) WriteJob(job *WorkerJob) (err os.Error) { |
|
|
|
return client.write(job.Encode()) |
|
|
|
func (agent *jobAgent) WriteJob(job *WorkerJob) (err os.Error) { |
|
|
|
return agent.write(job.Encode()) |
|
|
|
} |
|
|
|
|
|
|
|
// Internal write the encoded job. |
|
|
|
func (client *jobClient) write(buf []byte) (err os.Error) { |
|
|
|
func (agent *jobAgent) write(buf []byte) (err os.Error) { |
|
|
|
var n int |
|
|
|
for i := 0; i < len(buf); i += n { |
|
|
|
n, err = client.conn.Write(buf[i:]) |
|
|
|
n, err = agent.conn.Write(buf[i:]) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
@@ -123,9 +123,9 @@ func (client *jobClient) write(buf []byte) (err os.Error) { |
|
|
|
} |
|
|
|
|
|
|
|
// Close. |
|
|
|
func (client *jobClient) Close() (err os.Error) { |
|
|
|
client.running = false |
|
|
|
close(client.incoming) |
|
|
|
err = client.conn.Close() |
|
|
|
func (agent *jobAgent) Close() (err os.Error) { |
|
|
|
agent.running = false |
|
|
|
close(agent.incoming) |
|
|
|
err = agent.conn.Close() |
|
|
|
return |
|
|
|
} |