diff --git a/goes.go b/goes.go index 61af34f..330b805 100644 --- a/goes.go +++ b/goes.go @@ -46,90 +46,83 @@ func (c *Client) WithHTTPClient(cl *http.Client) *Client { // CreateIndex creates a new index represented by a name and a mapping func (c *Client) CreateIndex(name string, mapping interface{}) (*Response, error) { r := Request{ - Conn: c, Query: mapping, IndexList: []string{name}, Method: "PUT", } - return r.Run() + return c.Do(&r) } // DeleteIndex deletes an index represented by a name func (c *Client) DeleteIndex(name string) (*Response, error) { r := Request{ - Conn: c, IndexList: []string{name}, Method: "DELETE", } - return r.Run() + return c.Do(&r) } // RefreshIndex refreshes an index represented by a name func (c *Client) RefreshIndex(name string) (*Response, error) { r := Request{ - Conn: c, IndexList: []string{name}, Method: "POST", API: "_refresh", } - return r.Run() + return c.Do(&r) } // UpdateIndexSettings updates settings for existing index represented by a name and a settings // as described here: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html func (c *Client) UpdateIndexSettings(name string, settings interface{}) (*Response, error) { r := Request{ - Conn: c, Query: settings, IndexList: []string{name}, Method: "PUT", API: "_settings", } - return r.Run() + return c.Do(&r) } // Optimize an index represented by a name, extra args are also allowed please check: // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize func (c *Client) Optimize(indexList []string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, IndexList: indexList, ExtraArgs: extraArgs, Method: "POST", API: "_optimize", } - return r.Run() + return c.Do(&r) } // Stats fetches statistics (_stats) for the current elasticsearch server func (c *Client) Stats(indexList []string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, IndexList: indexList, ExtraArgs: extraArgs, Method: "GET", API: "_stats", } - return r.Run() + return c.Do(&r) } // IndexStatus fetches the status (_status) for the indices defined in // indexList. Use _all in indexList to get stats for all indices func (c *Client) IndexStatus(indexList []string) (*Response, error) { r := Request{ - Conn: c, IndexList: indexList, Method: "GET", API: "_status", } - return r.Run() + return c.Do(&r) } // BulkSend bulk adds multiple documents in bulk mode @@ -203,19 +196,33 @@ func (c *Client) BulkSend(documents []Document) (*Response, error) { bulkData[len(bulkData)-1] = []byte(nil) r := Request{ - Conn: c, Method: "POST", API: "_bulk", BulkData: bytes.Join(bulkData, []byte("\n")), } - return r.Run() + resp, err := c.Do(&r) + if err != nil { + return resp, err + } + + if resp.Errors { + for _, item := range resp.Items { + for _, i := range item { + if i.Error != "" { + return resp, &SearchError{i.Error, i.Status} + } + } + } + return resp, &SearchError{Msg: "Unknown error while bulk indexing"} + } + + return resp, err } // Search executes a search query against an index func (c *Client) Search(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, Query: query, IndexList: indexList, TypeList: typeList, @@ -224,13 +231,12 @@ func (c *Client) Search(query interface{}, indexList []string, typeList []string ExtraArgs: extraArgs, } - return r.Run() + return c.Do(&r) } // Count executes a count query against an index, use the Count field in the response for the result func (c *Client) Count(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, Query: query, IndexList: indexList, TypeList: typeList, @@ -239,7 +245,7 @@ func (c *Client) Count(query interface{}, indexList []string, typeList []string, ExtraArgs: extraArgs, } - return r.Run() + return c.Do(&r) } //Query runs a query against an index using the provided http method. @@ -247,7 +253,6 @@ func (c *Client) Count(query interface{}, indexList []string, typeList []string, //for the HTTP method. func (c *Client) Query(query interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, Query: query, IndexList: indexList, TypeList: typeList, @@ -256,7 +261,7 @@ func (c *Client) Query(query interface{}, indexList []string, typeList []string, ExtraArgs: extraArgs, } - return r.Run() + return c.Do(&r) } // Scan starts scroll over an index @@ -267,7 +272,6 @@ func (c *Client) Scan(query interface{}, indexList []string, typeList []string, v.Add("size", strconv.Itoa(size)) r := Request{ - Conn: c, Query: query, IndexList: indexList, TypeList: typeList, @@ -276,7 +280,7 @@ func (c *Client) Scan(query interface{}, indexList []string, typeList []string, ExtraArgs: v, } - return r.Run() + return c.Do(&r) } // Scroll fetches data by scroll id @@ -285,27 +289,25 @@ func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) { v.Add("scroll", timeout) r := Request{ - Conn: c, Method: "POST", API: "_search/scroll", ExtraArgs: v, Body: []byte(scrollID), } - return r.Run() + return c.Do(&r) } // Get a typed document by its id func (c *Client) Get(index string, documentType string, id string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, IndexList: []string{index}, Method: "GET", API: documentType + "/" + id, ExtraArgs: extraArgs, } - return r.Run() + return c.Do(&r) } // Index indexes a Document @@ -313,7 +315,6 @@ func (c *Client) Get(index string, documentType string, id string, extraArgs url // URL arguments, for example, to control routing, ttl, version, op_type, etc. func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, Query: d.Fields, IndexList: []string{d.Index.(string)}, TypeList: []string{d.Type}, @@ -326,7 +327,7 @@ func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { r.ID = d.ID.(string) } - return r.Run() + return c.Do(&r) } // Delete deletes a Document d @@ -334,7 +335,6 @@ func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { // URL arguments, for example, to control routing. func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, IndexList: []string{d.Index.(string)}, TypeList: []string{d.Type}, ExtraArgs: extraArgs, @@ -342,117 +342,7 @@ func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) { ID: d.ID.(string), } - return r.Run() -} - -// Run executes an elasticsearch Request. It converts data to Json, sends the -// request and returns the Response obtained -func (req *Request) Run() (*Response, error) { - body, statusCode, err := req.run() - esResp := &Response{Status: statusCode} - - if err != nil { - return esResp, err - } - - if req.Method != "HEAD" { - err = json.Unmarshal(body, &esResp) - if err != nil { - return esResp, err - } - err = json.Unmarshal(body, &esResp.Raw) - if err != nil { - return esResp, err - } - } - - if req.API == "_bulk" && esResp.Errors { - for _, item := range esResp.Items { - for _, i := range item { - if i.Error != "" { - return esResp, &SearchError{i.Error, i.Status} - } - } - } - return esResp, &SearchError{Msg: "Unknown error while bulk indexing"} - } - - if esResp.Error != "" { - return esResp, &SearchError{esResp.Error, esResp.Status} - } - - return esResp, nil -} - -func (req *Request) run() ([]byte, uint64, error) { - postData := []byte{} - - // XXX : refactor this - if len(req.Body) > 0 { - postData = req.Body - } else if req.API == "_bulk" { - postData = req.BulkData - } else { - b, err := json.Marshal(req.Query) - if err != nil { - return nil, 0, err - } - postData = b - } - - reader := bytes.NewReader(postData) - - newReq, err := http.NewRequest(req.Method, req.URL(), reader) - if err != nil { - return nil, 0, err - } - - if req.Method == "POST" || req.Method == "PUT" { - newReq.Header.Set("Content-Type", "application/json") - } - - resp, err := req.Conn.Client.Do(newReq) - if err != nil { - return nil, 0, err - } - - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, uint64(resp.StatusCode), err - } - - if resp.StatusCode > 201 && resp.StatusCode < 400 { - return nil, uint64(resp.StatusCode), errors.New(string(body)) - } - - return body, uint64(resp.StatusCode), nil -} - -// URL builds a Request for a URL -func (req *Request) URL() string { - path := "/" + strings.Join(req.IndexList, ",") - - if len(req.TypeList) > 0 { - path += "/" + strings.Join(req.TypeList, ",") - } - - // XXX : for indexing documents using the normal (non bulk) API - if len(req.ID) > 0 { - path += "/" + req.ID - } - - path += "/" + req.API - - u := url.URL{ - Scheme: "http", - Host: fmt.Sprintf("%s:%s", req.Conn.Host, req.Conn.Port), - Path: path, - RawQuery: req.ExtraArgs.Encode(), - } - - return u.String() + return c.Do(&r) } // Buckets returns list of buckets in aggregation @@ -489,39 +379,36 @@ func (b Bucket) Aggregation(name string) Aggregation { func (c *Client) PutMapping(typeName string, mapping interface{}, indexes []string) (*Response, error) { r := Request{ - Conn: c, Query: mapping, IndexList: indexes, Method: "PUT", API: "_mappings/" + typeName, } - return r.Run() + return c.Do(&r) } // GetMapping returns the mappings for the specified types func (c *Client) GetMapping(types []string, indexes []string) (*Response, error) { r := Request{ - Conn: c, IndexList: indexes, Method: "GET", API: "_mapping/" + strings.Join(types, ","), } - return r.Run() + return c.Do(&r) } // IndicesExist checks whether index (or indices) exist on the server func (c *Client) IndicesExist(indexes []string) (bool, error) { r := Request{ - Conn: c, IndexList: indexes, Method: "HEAD", } - resp, err := r.Run() + resp, err := c.Do(&r) return resp.Status == 200, err } @@ -529,7 +416,6 @@ func (c *Client) IndicesExist(indexes []string) (bool, error) { // Update updates the specified document using the _update endpoint func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, Query: query, IndexList: []string{d.Index.(string)}, TypeList: []string{d.Type}, @@ -542,20 +428,19 @@ func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*R r.ID = d.ID.(string) } - return r.Run() + return c.Do(&r) } // DeleteMapping deletes a mapping along with all data in the type func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, error) { r := Request{ - Conn: c, IndexList: indexes, Method: "DELETE", API: "_mappings/" + typeName, } - return r.Run() + return c.Do(&r) } func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Response, error) { @@ -573,13 +458,12 @@ func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Re } r := Request{ - Conn: c, Query: command, Method: "POST", API: "_aliases", } - return r.Run() + return c.Do(&r) } // AddAlias creates an alias to one or more indexes @@ -596,12 +480,64 @@ func (c *Client) RemoveAlias(alias string, indexes []string) (*Response, error) func (c *Client) AliasExists(alias string) (bool, error) { r := Request{ - Conn: c, Method: "HEAD", API: "_alias/" + alias, } - resp, err := r.Run() + resp, err := c.Do(&r) return resp.Status == 200, err } + +// Do runs the request returned by the requestor and returns the parsed response +func (c *Client) Do(r Requester) (*Response, error) { + req, err := r.Request() + if err != nil { + return &Response{}, err + } + req.URL.Scheme = "http" + req.URL.Host = fmt.Sprintf("%s:%s", c.Host, c.Port) + + body, statusCode, err := c.doRequest(req) + esResp := &Response{Status: statusCode} + + if err != nil { + return esResp, err + } + + if req.Method != "HEAD" { + err = json.Unmarshal(body, &esResp) + if err != nil { + return esResp, err + } + err = json.Unmarshal(body, &esResp.Raw) + if err != nil { + return esResp, err + } + } + + if esResp.Error != "" { + return esResp, &SearchError{esResp.Error, esResp.Status} + } + + return esResp, nil +} + +func (c *Client) doRequest(req *http.Request) ([]byte, uint64, error) { + resp, err := c.Client.Do(req) + if err != nil { + return nil, 0, err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, uint64(resp.StatusCode), err + } + + if resp.StatusCode > 201 && resp.StatusCode < 400 { + return nil, uint64(resp.StatusCode), errors.New(string(body)) + } + + return body, uint64(resp.StatusCode), nil +} diff --git a/goes_test.go b/goes_test.go index ddd7731..cf34966 100644 --- a/goes_test.go +++ b/goes_test.go @@ -60,10 +60,9 @@ func (s *GoesTestSuite) TestWithHTTPClient(c *C) { } func (s *GoesTestSuite) TestUrl(c *C) { - conn := NewClient(ESHost, ESPort) + //conn := NewClient(ESHost, ESPort) r := Request{ - Conn: conn, Query: "q", IndexList: []string{"i"}, TypeList: []string{}, @@ -71,21 +70,21 @@ func (s *GoesTestSuite) TestUrl(c *C) { API: "_search", } - c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/i/_search") + c.Assert(r.URL().String(), Equals, "/i/_search") r.IndexList = []string{"a", "b"} - c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/_search") + c.Assert(r.URL().String(), Equals, "/a,b/_search") r.TypeList = []string{"c", "d"} - c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/_search") + c.Assert(r.URL().String(), Equals, "/a,b/c,d/_search") r.ExtraArgs = make(url.Values, 1) r.ExtraArgs.Set("version", "1") - c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/_search?version=1") + c.Assert(r.URL().String(), Equals, "/a,b/c,d/_search?version=1") r.ID = "1234" r.API = "" - c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/1234/?version=1") + c.Assert(r.URL().String(), Equals, "/a,b/c,d/1234/?version=1") } func (s *GoesTestSuite) TestEsDown(c *C) { @@ -94,13 +93,12 @@ func (s *GoesTestSuite) TestEsDown(c *C) { var query = map[string]interface{}{"query": "foo"} r := Request{ - Conn: conn, Query: query, IndexList: []string{"i"}, Method: "GET", API: "_search", } - _, err := r.Run() + _, err := conn.Do(&r) c.Assert(err, ErrorMatches, ".* no such host") } @@ -111,13 +109,12 @@ func (s *GoesTestSuite) TestRunMissingIndex(c *C) { var query = map[string]interface{}{"query": "foo"} r := Request{ - Conn: conn, Query: query, IndexList: []string{"i"}, Method: "GET", API: "_search", } - _, err := r.Run() + _, err := conn.Do(&r) c.Assert(err.Error(), Equals, "[404] IndexMissingException[[i] missing]") } diff --git a/request.go b/request.go new file mode 100644 index 0000000..abe05a2 --- /dev/null +++ b/request.go @@ -0,0 +1,106 @@ +package goes + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "net/url" + "strings" +) + +// Requester implements Request which builds an HTTP request for Elasticsearch +type Requester interface { + // Request should set the URL and Body (if needed). The host of the URL will be overwritten by the client. + Request() (*http.Request, error) +} + +// Request holds a single request to elasticsearch +type Request struct { + // A search query + Query interface{} + + // Which index to search into + IndexList []string + + // Which type to search into + TypeList []string + + // HTTP Method to user (GET, POST ...) + Method string + + // Which api keyword (_search, _bulk, etc) to use + API string + + // Bulk data + BulkData []byte + + // Request body + Body []byte + + // A list of extra URL arguments + ExtraArgs url.Values + + // Used for the id field when indexing a document + ID string +} + +// URL builds a URL for a Request +func (req *Request) URL() *url.URL { + path := "/" + strings.Join(req.IndexList, ",") + + if len(req.TypeList) > 0 { + path += "/" + strings.Join(req.TypeList, ",") + } + + // XXX : for indexing documents using the normal (non bulk) API + if len(req.ID) > 0 { + path += "/" + req.ID + } + + path += "/" + req.API + + u := url.URL{ + //Scheme: "http", + //Host: fmt.Sprintf("%s:%s", req.Conn.Host, req.Conn.Port), + Path: path, + RawQuery: req.ExtraArgs.Encode(), + } + + return &u +} + +// Request generates an http.Request based on the contents of the Request struct +func (req *Request) Request() (*http.Request, error) { + postData := []byte{} + + // XXX : refactor this + if len(req.Body) > 0 { + postData = req.Body + } else if req.API == "_bulk" { + postData = req.BulkData + } else { + b, err := json.Marshal(req.Query) + if err != nil { + return nil, err + } + postData = b + } + + reader := ioutil.NopCloser(bytes.NewReader(postData)) + + newReq, err := http.NewRequest(req.Method, "", nil) + if err != nil { + return nil, err + } + newReq.URL = req.URL() + newReq.Body = reader + newReq.ContentLength = int64(len(postData)) + + if req.Method == "POST" || req.Method == "PUT" { + newReq.Header.Set("Content-Type", "application/json") + } + return newReq, nil +} + +var _ Requester = (*Request)(nil) diff --git a/structs.go b/structs.go index 6aca112..dbfd686 100644 --- a/structs.go +++ b/structs.go @@ -4,10 +4,7 @@ package goes -import ( - "net/http" - "net/url" -) +import "net/http" // Client represents a connection to elasticsearch type Client struct { @@ -22,39 +19,6 @@ type Client struct { Client *http.Client } -// Request holds a single request to elasticsearch -type Request struct { - // Which connection will be used - Conn *Client - - // A search query - Query interface{} - - // Which index to search into - IndexList []string - - // Which type to search into - TypeList []string - - // HTTP Method to user (GET, POST ...) - Method string - - // Which api keyword (_search, _bulk, etc) to use - API string - - // Bulk data - BulkData []byte - - // Request body - Body []byte - - // A list of extra URL arguments - ExtraArgs url.Values - - // Used for the id field when indexing a document - ID string -} - // Response holds an elasticsearch response type Response struct { Acknowledged bool