From b684e6945199beb4ececde6e62841104e16afd0d Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Wed, 1 Feb 2017 16:59:14 -0600 Subject: [PATCH 01/23] Use slices a bit more nicely --- goes.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/goes.go b/goes.go index 94b3fe4..cc7a9d3 100644 --- a/goes.go +++ b/goes.go @@ -145,7 +145,7 @@ func (c *Client) BulkSend(documents []Document) (*Response, error) { // len(documents) * 2 : action + optional_sources // + 1 : room for the trailing \n - bulkData := make([][]byte, len(documents)*2+1) + bulkData := make([][]byte, 0, len(documents)*2+1) i := 0 for _, doc := range documents { @@ -161,7 +161,7 @@ func (c *Client) BulkSend(documents []Document) (*Response, error) { return &Response{}, err } - bulkData[i] = action + bulkData = append(bulkData, action) i++ if doc.Fields != nil { @@ -187,13 +187,13 @@ func (c *Client) BulkSend(documents []Document) (*Response, error) { return &Response{}, err } - bulkData[i] = sources + bulkData = append(bulkData, sources) i++ } } // forces an extra trailing \n absolutely necessary for elasticsearch - bulkData[len(bulkData)-1] = []byte(nil) + bulkData = append(bulkData, []byte(nil)) r := Request{ Method: "POST", @@ -445,7 +445,7 @@ func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, er func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Response, error) { command := map[string]interface{}{ - "actions": make([]map[string]interface{}, 1), + "actions": make([]map[string]interface{}, 0, 1), } for _, index := range indexes { From a1af556756e516d33cd9a4245c02ff431d164dfd Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Wed, 1 Feb 2017 16:59:43 -0600 Subject: [PATCH 02/23] Zero out some fields which may vary between ES versions --- goes_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/goes_test.go b/goes_test.go index 1abe586..24e7a1e 100644 --- a/goes_test.go +++ b/goes_test.go @@ -540,6 +540,8 @@ func (s *GoesTestSuite) TestDelete(c *C) { Version: 2, } response.Raw = nil + response.Shards.Total = 0 + response.Shards.Successful = 0 c.Assert(response, DeepEquals, expectedResponse) response, err = conn.Delete(d, url.Values{}) @@ -616,6 +618,8 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { Version: 0, } response.Raw = nil + response.Shards.Total = 0 + response.Shards.Successful = 0 c.Assert(response, DeepEquals, expectedResponse) //should be 0 docs after delete by query @@ -1178,6 +1182,8 @@ func (s *GoesTestSuite) TestUpdate(c *C) { } response.Raw = nil + response.Shards.Successful = 0 + response.Shards.Total = 0 c.Assert(response, DeepEquals, expectedResponse) // Now that we have an ordinary document indexed, try updating it From f5716dce831d0ff92e2a4063f995cfa175e6a03f Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 11:01:27 -0600 Subject: [PATCH 03/23] Some 5.x query changes --- goes_test.go | 90 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 20 deletions(-) diff --git a/goes_test.go b/goes_test.go index 24e7a1e..869f087 100644 --- a/goes_test.go +++ b/goes_test.go @@ -5,6 +5,8 @@ package goes import ( + "encoding/json" + "fmt" "net/http" "net/url" "os" @@ -16,8 +18,9 @@ import ( ) var ( - ESHost = "localhost" - ESPort = "9200" + ESHost = "localhost" + ESPort = "9200" + ESVersion = "0.0.0" ) // Hook up gocheck into the gotest runner. @@ -37,6 +40,19 @@ func (s *GoesTestSuite) SetUpTest(c *C) { if p != "" { ESPort = p } + ESVersion = getESVersion(c, ESHost, ESPort) +} + +func getESVersion(c *C, host, port string) string { + res, err := http.Get(fmt.Sprintf("http://%s:%s/", host, port)) + c.Assert(err, Equals, nil) + defer res.Body.Close() + decoder := json.NewDecoder(res.Body) + + var info map[string]interface{} + err = decoder.Decode(&info) + c.Assert(err, Equals, nil) + return info["version"].(map[string]interface{})["number"].(string) } func (s *GoesTestSuite) TestNewClient(c *C) { @@ -928,16 +944,31 @@ func (s *GoesTestSuite) TestScroll(c *C) { _, err = conn.RefreshIndex(indexName) c.Assert(err, IsNil) - query := map[string]interface{}{ - "query": map[string]interface{}{ - "filtered": map[string]interface{}{ - "filter": map[string]interface{}{ - "term": map[string]interface{}{ - "user": "foo", + var query map[string]interface{} + if ESVersion > "5" { + query = map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "filter": map[string]interface{}{ + "term": map[string]interface{}{ + "user": "foo", + }, }, }, }, - }, + } + } else { + query = map[string]interface{}{ + "query": map[string]interface{}{ + "filtered": map[string]interface{}{ + "filter": map[string]interface{}{ + "term": map[string]interface{}{ + "user": "foo", + }, + }, + }, + }, + } } scan, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1) @@ -1187,20 +1218,39 @@ func (s *GoesTestSuite) TestUpdate(c *C) { c.Assert(response, DeepEquals, expectedResponse) // Now that we have an ordinary document indexed, try updating it - query := map[string]interface{}{ - "script": "ctx._source.counter += count", - "lang": "groovy", - "params": map[string]interface{}{ - "count": 5, - }, - "upsert": map[string]interface{}{ - "message": "candybar", - "user": "admin", - "counter": 1, - }, + var query map[string]interface{} + if ESVersion > "5" { + query = map[string]interface{}{ + "script": map[string]interface{}{ + "inline": "ctx._source.counter += params.count", + "lang": "painless", + "params": map[string]interface{}{ + "count": 5, + }, + }, + "upsert": map[string]interface{}{ + "message": "candybar", + "user": "admin", + "counter": 1, + }, + } + } else { + query = map[string]interface{}{ + "script": "ctx._source.counter += count", + "lang": "groovy", + "params": map[string]interface{}{ + "count": 5, + }, + "upsert": map[string]interface{}{ + "message": "candybar", + "user": "admin", + "counter": 1, + }, + } } response, err = conn.Update(d, query, extraArgs) + fmt.Println(response) if err != nil && strings.Contains(err.(*SearchError).Msg, "dynamic scripting") { c.Skip("Scripting is disabled on server, skipping this test") return From f9192a7ca89ca803a82910c375d0eb1d368733f2 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 11:54:24 -0600 Subject: [PATCH 04/23] Build in a way to detect the ES version so it can be used for version-specific things --- goes.go | 24 +++++++++++++++++++++++- goes_test.go | 27 ++++++--------------------- structs.go | 3 +++ 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/goes.go b/goes.go index cc7a9d3..edc844e 100644 --- a/goes.go +++ b/goes.go @@ -34,7 +34,7 @@ func (err *SearchError) Error() string { // This function is pretty useless for now but might be useful in a near future // if wee need more features like connection pooling or load balancing. func NewClient(host string, port string) *Client { - return &Client{host, port, http.DefaultClient} + return &Client{host, port, http.DefaultClient, ""} } // WithHTTPClient sets the http.Client to be used with the connection. Returns the original client. @@ -43,6 +43,28 @@ func (c *Client) WithHTTPClient(cl *http.Client) *Client { return c } +// Version returns the detected version of the connected ES server +func (c *Client) Version() (string, error) { + // Use cached version if it was already fetched + if c.version != "" { + return c.version, nil + } + + // Get the version if it was not cached + r := Request{Method: "GET"} + res, err := c.Do(&r) + if err != nil { + return "", err + } + if version, ok := res.Raw["version"].(map[string]interface{}); ok { + if number, ok := version["number"].(string); ok { + c.version = number + return number, nil + } + } + return "", errors.New("No version returned by ElasticSearch Server") +} + // CreateIndex creates a new index represented by a name and a mapping func (c *Client) CreateIndex(name string, mapping interface{}) (*Response, error) { r := Request{ diff --git a/goes_test.go b/goes_test.go index 869f087..ff38a06 100644 --- a/goes_test.go +++ b/goes_test.go @@ -5,7 +5,6 @@ package goes import ( - "encoding/json" "fmt" "net/http" "net/url" @@ -18,9 +17,8 @@ import ( ) var ( - ESHost = "localhost" - ESPort = "9200" - ESVersion = "0.0.0" + ESHost = "localhost" + ESPort = "9200" ) // Hook up gocheck into the gotest runner. @@ -40,24 +38,11 @@ func (s *GoesTestSuite) SetUpTest(c *C) { if p != "" { ESPort = p } - ESVersion = getESVersion(c, ESHost, ESPort) -} - -func getESVersion(c *C, host, port string) string { - res, err := http.Get(fmt.Sprintf("http://%s:%s/", host, port)) - c.Assert(err, Equals, nil) - defer res.Body.Close() - decoder := json.NewDecoder(res.Body) - - var info map[string]interface{} - err = decoder.Decode(&info) - c.Assert(err, Equals, nil) - return info["version"].(map[string]interface{})["number"].(string) } func (s *GoesTestSuite) TestNewClient(c *C) { conn := NewClient(ESHost, ESPort) - c.Assert(conn, DeepEquals, &Client{ESHost, ESPort, http.DefaultClient}) + c.Assert(conn, DeepEquals, &Client{ESHost, ESPort, http.DefaultClient, ""}) } func (s *GoesTestSuite) TestWithHTTPClient(c *C) { @@ -70,7 +55,7 @@ func (s *GoesTestSuite) TestWithHTTPClient(c *C) { } conn := NewClient(ESHost, ESPort).WithHTTPClient(cl) - c.Assert(conn, DeepEquals, &Client{ESHost, ESPort, cl}) + c.Assert(conn, DeepEquals, &Client{ESHost, ESPort, cl, ""}) c.Assert(conn.Client.Transport.(*http.Transport).DisableCompression, Equals, true) c.Assert(conn.Client.Transport.(*http.Transport).ResponseHeaderTimeout, Equals, 1*time.Second) } @@ -945,7 +930,7 @@ func (s *GoesTestSuite) TestScroll(c *C) { c.Assert(err, IsNil) var query map[string]interface{} - if ESVersion > "5" { + if version, _ := conn.Version(); version > "5" { query = map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ @@ -1219,7 +1204,7 @@ func (s *GoesTestSuite) TestUpdate(c *C) { // Now that we have an ordinary document indexed, try updating it var query map[string]interface{} - if ESVersion > "5" { + if version, _ := conn.Version(); version > "5" { query = map[string]interface{}{ "script": map[string]interface{}{ "inline": "ctx._source.counter += params.count", diff --git a/structs.go b/structs.go index ca08edd..75cdc5d 100644 --- a/structs.go +++ b/structs.go @@ -20,6 +20,9 @@ type Client struct { // Client is the http client used to make requests, allowing settings things // such as timeouts etc Client *http.Client + + // Detected version of ES + version string } // Response holds an elasticsearch response From 6492f3a5e386ea01d48b13579fd49d6c08e0745f Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 13:04:45 -0600 Subject: [PATCH 05/23] Make Client.Scan work with ES 5.x --- goes.go | 39 ++++++++++++++++++++++++++++++--------- goes_test.go | 14 +++++++++----- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/goes.go b/goes.go index edc844e..803ef52 100644 --- a/goes.go +++ b/goes.go @@ -286,10 +286,21 @@ func (c *Client) Query(query interface{}, indexList []string, typeList []string, return c.Do(&r) } -// Scan starts scroll over an index +// Scan starts scroll over an index. +// For ES versions < 5.x, it uses search_type=scan; for 5.x it uses sort=_doc. This means that data +// will be returned in the initial response for 5.x versions, but not for older versions. Code +// wishing to be compatible with both should be written to handle either case. func (c *Client) Scan(query interface{}, indexList []string, typeList []string, timeout string, size int) (*Response, error) { v := url.Values{} - v.Add("search_type", "scan") + version, err := c.Version() + if err != nil { + return nil, err + } + if version > "5" { + v.Add("sort", "_doc") + } else { + v.Add("search_type", "scan") + } v.Add("scroll", timeout) v.Add("size", strconv.Itoa(size)) @@ -307,14 +318,24 @@ func (c *Client) Scan(query interface{}, indexList []string, typeList []string, // Scroll fetches data by scroll id func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) { - v := url.Values{} - v.Add("scroll", timeout) - r := Request{ - Method: "POST", - API: "_search/scroll", - ExtraArgs: v, - Body: []byte(scrollID), + Method: "POST", + API: "_search/scroll", + } + + if version, err := c.Version(); err != nil { + return nil, err + } else if version > "2" { + r.Body, err = json.Marshal(map[string]string{"scroll": timeout, "scroll_id": scrollID}) + if err != nil { + return nil, err + } + } else { + v := url.Values{} + v.Add("scroll", timeout) + v.Add("scroll_id", scrollID) + + r.ExtraArgs = v } return c.Do(&r) diff --git a/goes_test.go b/goes_test.go index ff38a06..208ca27 100644 --- a/goes_test.go +++ b/goes_test.go @@ -930,7 +930,8 @@ func (s *GoesTestSuite) TestScroll(c *C) { c.Assert(err, IsNil) var query map[string]interface{} - if version, _ := conn.Version(); version > "5" { + version, _ := conn.Version() + if version > "5" { query = map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ @@ -956,12 +957,15 @@ func (s *GoesTestSuite) TestScroll(c *C) { } } - scan, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1) + searchResults, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1) c.Assert(err, IsNil) - c.Assert(len(scan.ScrollID) > 0, Equals, true) + c.Assert(len(searchResults.ScrollID) > 0, Equals, true) - searchResults, err := conn.Scroll(scan.ScrollID, "1m") - c.Assert(err, IsNil) + // Versions < 5.x don't include results in the initial response + if version < "5" { + searchResults, err = conn.Scroll(searchResults.ScrollID, "1m") + c.Assert(err, IsNil) + } // some data in first chunk c.Assert(searchResults.Hits.Total, Equals, uint64(2)) From b04496cc3e2a64c5eb4dd678e4eeec19b6cfd6cc Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 14:50:25 -0600 Subject: [PATCH 06/23] Add mapping so aggregation works in ES 5.x --- goes_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/goes_test.go b/goes_test.go index 208ca27..f9c0835 100644 --- a/goes_test.go +++ b/goes_test.go @@ -1037,6 +1037,16 @@ func (s *GoesTestSuite) TestAggregations(c *C) { "index.number_of_shards": 1, "index.number_of_replicas": 0, }, + "mappings": map[string]interface{}{ + docType: map[string]interface{}{ + "properties": map[string]interface{}{ + "user": map[string]interface{}{ + "type": "string", + "index": "not_analyzed", + }, + }, + }, + }, } defer conn.DeleteIndex(indexName) From 63b210957ad16ba567526f3a8215a9059b154850 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 15:08:39 -0600 Subject: [PATCH 07/23] If no body is supplied, don't send 'null' as body --- request.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/request.go b/request.go index bf8093d..00ca68f 100644 --- a/request.go +++ b/request.go @@ -82,7 +82,7 @@ func (req *Request) Request() (*http.Request, error) { postData = req.Body } else if req.API == "_bulk" { postData = req.BulkData - } else { + } else if req.Body != nil { b, err := json.Marshal(req.Query) if err != nil { return nil, err @@ -90,14 +90,12 @@ func (req *Request) Request() (*http.Request, error) { postData = b } - reader := ioutil.NopCloser(bytes.NewReader(postData)) - newReq, err := http.NewRequest(req.Method, "", nil) if err != nil { return nil, err } newReq.URL = req.URL() - newReq.Body = reader + newReq.Body = ioutil.NopCloser(bytes.NewReader(postData)) newReq.ContentLength = int64(len(postData)) if req.Method == "POST" || req.Method == "PUT" { From f82254f6d3123a566f39576c086d5c1881f1dce5 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 15:19:08 -0600 Subject: [PATCH 08/23] Check correct field to see if body should be empty --- request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/request.go b/request.go index 00ca68f..9886865 100644 --- a/request.go +++ b/request.go @@ -82,7 +82,7 @@ func (req *Request) Request() (*http.Request, error) { postData = req.Body } else if req.API == "_bulk" { postData = req.BulkData - } else if req.Body != nil { + } else if req.Query != nil { b, err := json.Marshal(req.Query) if err != nil { return nil, err From 2bb228813f8578d0df16047bbbd3ebcc6d9f66ac Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 18:03:38 -0600 Subject: [PATCH 09/23] Clear expected shards since that varies between versions --- goes_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/goes_test.go b/goes_test.go index f9c0835..f0e7e76 100644 --- a/goes_test.go +++ b/goes_test.go @@ -541,8 +541,7 @@ func (s *GoesTestSuite) TestDelete(c *C) { Version: 2, } response.Raw = nil - response.Shards.Total = 0 - response.Shards.Successful = 0 + response.Shards = Shard{} c.Assert(response, DeepEquals, expectedResponse) response, err = conn.Delete(d, url.Values{}) @@ -558,6 +557,7 @@ func (s *GoesTestSuite) TestDelete(c *C) { Version: 3, } response.Raw = nil + response.Shards = Shard{} c.Assert(response, DeepEquals, expectedResponse) } @@ -619,8 +619,7 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { Version: 0, } response.Raw = nil - response.Shards.Total = 0 - response.Shards.Successful = 0 + response.Shards = Shard{} c.Assert(response, DeepEquals, expectedResponse) //should be 0 docs after delete by query From 22f0f3b3be0201a65afd08789597da1f49fb4247 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 18:04:24 -0600 Subject: [PATCH 10/23] Add Client.DeleteByQuery rather than requiring users to pass 'DELETE' to Query --- goes.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/goes.go b/goes.go index 803ef52..d11e1fb 100644 --- a/goes.go +++ b/goes.go @@ -286,6 +286,34 @@ func (c *Client) Query(query interface{}, indexList []string, typeList []string, return c.Do(&r) } +// DeleteByQuery deletes documents matching the specified query. It will return an error for ES 2.x, +// because delete by query support was removed in those versions. +func (c *Client) DeleteByQuery(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { + version, err := c.Version() + if err != nil { + return nil, err + } + if version > "2" && version < "5" { + return nil, errors.New("ElasticSearch 2.x does not support delete by query") + } + + r := Request{ + Query: query, + IndexList: indexList, + TypeList: typeList, + Method: "DELETE", + API: "_query", + ExtraArgs: extraArgs, + } + + if version > "5" { + r.API = "_delete_by_query" + r.Method = "POST" + } + + return c.Do(&r) +} + // Scan starts scroll over an index. // For ES versions < 5.x, it uses search_type=scan; for 5.x it uses sort=_doc. This means that data // will be returned in the initial response for 5.x versions, but not for older versions. Code From a9196feea3b664b23190881f640a1754f3e5602e Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 18:11:37 -0600 Subject: [PATCH 11/23] Use DeleteByQuery in test --- goes_test.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/goes_test.go b/goes_test.go index f0e7e76..0643dbd 100644 --- a/goes_test.go +++ b/goes_test.go @@ -567,6 +567,8 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { docID := "1234" conn := NewClient(ESHost, ESPort) + version, _ := conn.Version() + // just in case conn.DeleteIndex(indexName) @@ -606,7 +608,13 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { c.Assert(err, IsNil) c.Assert(response.Hits.Total, Equals, uint64(1)) - response, err = conn.Query(query, []string{indexName}, []string{docType}, "DELETE", url.Values{}) + response, err = conn.DeleteByQuery(query, []string{indexName}, []string{docType}, url.Values{}) + + // There's no delete by query in ES 2.x + if version > "2" && version < "5" { + c.Assert(err, ErrorMatches, ".* does not support delete by query") + return + } c.Assert(err, IsNil) @@ -620,8 +628,12 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { } response.Raw = nil response.Shards = Shard{} + response.Took = 0 c.Assert(response, DeepEquals, expectedResponse) + _, err = conn.RefreshIndex(indexName) + c.Assert(err, IsNil) + //should be 0 docs after delete by query response, err = conn.Search(query, []string{indexName}, []string{docType}, url.Values{}) c.Assert(err, IsNil) From 28784db09c8a090f37ebd2d17bf54c85177fb05a Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 18:19:27 -0600 Subject: [PATCH 12/23] Be more general about error to support ES 5.x --- goes_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/goes_test.go b/goes_test.go index 0643dbd..bd2bab8 100644 --- a/goes_test.go +++ b/goes_test.go @@ -151,9 +151,10 @@ func (s *GoesTestSuite) TestDeleteIndexInexistantIndex(c *C) { conn := NewClient(ESHost, ESPort) resp, err := conn.DeleteIndex("foobar") - c.Assert(err.Error(), Equals, "[404] IndexMissingException[[foobar] missing]") + c.Assert(err.Error(), Matches, "\\[404\\] .*foobar.*") resp.Raw = nil // Don't make us have to duplicate this. - c.Assert(resp, DeepEquals, &Response{Status: 404, Error: "IndexMissingException[[foobar] missing]"}) + c.Assert(resp.Status, Equals, uint64(404)) + c.Assert(resp.Error, Matches, ".*foobar.*") } func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) { From 7ee055cb7dcfd82428b4bb450c0d1b608c2d1e59 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 18:28:09 -0600 Subject: [PATCH 13/23] Delete mapping is no longer supported in 2.x+ versions of ES --- goes.go | 5 +++++ goes_test.go | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/goes.go b/goes.go index d11e1fb..0e5513b 100644 --- a/goes.go +++ b/goes.go @@ -504,6 +504,11 @@ func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*R // DeleteMapping deletes a mapping along with all data in the type func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, error) { + if version, err := c.Version(); err != nil { + return nil, err + } else if version > "2" { + return nil, errors.New("Deletion of mappings is not supported in ES 2.x and above.") + } r := Request{ IndexList: indexes, diff --git a/goes_test.go b/goes_test.go index bd2bab8..cd0ba7c 100644 --- a/goes_test.go +++ b/goes_test.go @@ -1365,6 +1365,10 @@ func (s *GoesTestSuite) TestDeleteMapping(c *C) { time.Sleep(200 * time.Millisecond) response, err = conn.DeleteMapping("tweet", []string{indexName}) + if version, _ := conn.Version(); version > "2" { + c.Assert(err, ErrorMatches, ".*not supported.*") + return + } c.Assert(err, IsNil) c.Assert(response.Acknowledged, Equals, true) From 19c83bfeff7ca44b3171c24bd80cd7253362326f Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 18:44:49 -0600 Subject: [PATCH 14/23] fields no longer supported in 5.x --- goes_test.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/goes_test.go b/goes_test.go index cd0ba7c..58ea716 100644 --- a/goes_test.go +++ b/goes_test.go @@ -651,6 +651,7 @@ func (s *GoesTestSuite) TestGet(c *C) { } conn := NewClient(ESHost, ESPort) + version, _ := conn.Version() conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -683,11 +684,6 @@ func (s *GoesTestSuite) TestGet(c *C) { response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) - fields := make(url.Values, 1) - fields.Set("fields", "f1") - response, err = conn.Get(indexName, docType, docID, fields) - c.Assert(err, IsNil) - expectedResponse = &Response{ Status: 200, Index: indexName, @@ -700,6 +696,18 @@ func (s *GoesTestSuite) TestGet(c *C) { }, } + fields := make(url.Values, 1) + // The fields param is no longer supported in ES 5.x + if version < "5" { + fields.Set("fields", "f1") + } else { + expectedResponse.Source = map[string]interface{}{"f1": "foo"} + expectedResponse.Fields = nil + fields.Set("_source", "f1") + } + response, err = conn.Get(indexName, docType, docID, fields) + c.Assert(err, IsNil) + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) } From 4b35e6f0cff5fd5e7864c7c6670f21178fd44835 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 18:54:29 -0600 Subject: [PATCH 15/23] Fix up for 5.x, not really sure why ttl was set in the first place --- goes_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/goes_test.go b/goes_test.go index 58ea716..7e61aeb 100644 --- a/goes_test.go +++ b/goes_test.go @@ -457,9 +457,7 @@ func (s *GoesTestSuite) TestIndexIdDefined(c *C) { }, } - extraArgs := make(url.Values, 1) - extraArgs.Set("ttl", "86400000") - response, err := conn.Index(d, extraArgs) + response, err := conn.Index(d, nil) c.Assert(err, IsNil) expectedResponse := &Response{ @@ -471,6 +469,7 @@ func (s *GoesTestSuite) TestIndexIdDefined(c *C) { } response.Raw = nil + response.Shards = Shard{} c.Assert(response, DeepEquals, expectedResponse) } From a6663ce61d1ba929f8f4d2c3037da781b4ed19d7 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 19:08:33 -0600 Subject: [PATCH 16/23] _optimize was renamed to _forcemerge in ES 2.1.0 --- goes.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/goes.go b/goes.go index 0e5513b..f99976b 100644 --- a/goes.go +++ b/goes.go @@ -119,10 +119,18 @@ func (c *Client) Optimize(indexList []string, extraArgs url.Values) (*Response, Method: "POST", API: "_optimize", } + if version, _ := c.Version(); version > "2.1" { + r.API = "_forcemerge" + } return c.Do(&r) } +// ForceMerge is the same as Optimize, but matches the naming of the endpoint as of ES 2.1.0 +func (c *Client) ForceMerge(indexList []string, extraArgs url.Values) (*Response, error) { + return c.Optimize(indexList, extraArgs) +} + // Stats fetches statistics (_stats) for the current elasticsearch server func (c *Client) Stats(indexList []string, extraArgs url.Values) (*Response, error) { r := Request{ From 6c0739bec7be470cc5bcb13a1acf41e41371dc7f Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 19:09:04 -0600 Subject: [PATCH 17/23] More TTL and shard zeroing --- goes_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/goes_test.go b/goes_test.go index 7e61aeb..a0b9e25 100644 --- a/goes_test.go +++ b/goes_test.go @@ -391,9 +391,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) { }, } - extraArgs := make(url.Values, 1) - extraArgs.Set("ttl", "86400000") - response, err := conn.Index(d, extraArgs) + response, err := conn.Index(d, nil) c.Assert(err, IsNil) expectedResponse := &Response{ @@ -405,6 +403,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) { } response.Raw = nil + response.Shards = Shard{} c.Assert(response, DeepEquals, expectedResponse) } @@ -428,9 +427,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsNotInMapOrStruct(c *C) { Fields: "test", } - extraArgs := make(url.Values, 1) - extraArgs.Set("ttl", "86400000") - _, err = conn.Index(d, extraArgs) + _, err = conn.Index(d, nil) c.Assert(err, Not(IsNil)) } From 49e38a74a2c07fece74d3180744f7606a371de28 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 19:09:19 -0600 Subject: [PATCH 18/23] Status endpoint is gone from 2.0 on --- goes_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/goes_test.go b/goes_test.go index a0b9e25..e776b5b 100644 --- a/goes_test.go +++ b/goes_test.go @@ -817,6 +817,12 @@ func (s *GoesTestSuite) TestCount(c *C) { func (s *GoesTestSuite) TestIndexStatus(c *C) { indexName := "testindexstatus" conn := NewClient(ESHost, ESPort) + + // _status endpoint was removed in ES 2.0 + if version, _ := conn.Version(); version > "2" { + return + } + conn.DeleteIndex(indexName) mapping := map[string]interface{}{ From 782ceb74a5d264dfc4f602be18fc448195fd4ee8 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 19:19:51 -0600 Subject: [PATCH 19/23] Be a bit more flexible with error messages --- goes_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/goes_test.go b/goes_test.go index e776b5b..48c504c 100644 --- a/goes_test.go +++ b/goes_test.go @@ -115,7 +115,7 @@ func (s *GoesTestSuite) TestRunMissingIndex(c *C) { } _, err := conn.Do(&r) - c.Assert(err.Error(), Equals, "[404] IndexMissingException[[i] missing]") + c.Assert(err.Error(), Matches, "\\[40.\\] .*i.*") } func (s *GoesTestSuite) TestCreateIndex(c *C) { @@ -1473,7 +1473,7 @@ func (s *GoesTestSuite) TestRemoveAlias(c *C) { // Get document via alias _, err = conn.Get(aliasName, docType, docID, url.Values{}) - c.Assert(err.Error(), Equals, "[404] IndexMissingException[["+aliasName+"] missing]") + c.Assert(err.Error(), Matches, "\\[404\\] .*"+aliasName+".*") } func (s *GoesTestSuite) TestAliasExists(c *C) { From 167e20d669754e219f4eefbb1b9231762b4fd415 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 19:20:07 -0600 Subject: [PATCH 20/23] Make sure we delete those test indexes, even on failure --- goes_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/goes_test.go b/goes_test.go index 48c504c..8a82d1a 100644 --- a/goes_test.go +++ b/goes_test.go @@ -163,6 +163,7 @@ func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) { indexName := "testdeleteindexexistingindex" _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + defer conn.DeleteIndex(indexName) c.Assert(err, IsNil) @@ -181,8 +182,12 @@ func (s *GoesTestSuite) TestUpdateIndexSettings(c *C) { conn := NewClient(ESHost, ESPort) indexName := "testupdateindex" + // Just in case + conn.DeleteIndex(indexName) + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) _, err = conn.UpdateIndexSettings(indexName, map[string]interface{}{ "index": map[string]interface{}{ @@ -201,6 +206,7 @@ func (s *GoesTestSuite) TestRefreshIndex(c *C) { _, err := conn.CreateIndex(indexName, map[string]interface{}{}) c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) _, err = conn.RefreshIndex(indexName) c.Assert(err, IsNil) @@ -216,6 +222,7 @@ func (s *GoesTestSuite) TestOptimize(c *C) { conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) // we must wait for a bit otherwise ES crashes time.Sleep(1 * time.Second) @@ -262,6 +269,7 @@ func (s *GoesTestSuite) TestBulkSend(c *C) { conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, nil) c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) response, err := conn.BulkSend(tweets) i := Item{ @@ -352,6 +360,7 @@ func (s *GoesTestSuite) TestStats(c *C) { conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) // we must wait for a bit otherwise ES crashes time.Sleep(1 * time.Second) From 42e5dfaa7358386ce6ba13dd22362a6a36d85c26 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 19:22:12 -0600 Subject: [PATCH 21/23] Remove leftover debug print --- goes_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/goes_test.go b/goes_test.go index 8a82d1a..0050a36 100644 --- a/goes_test.go +++ b/goes_test.go @@ -5,7 +5,6 @@ package goes import ( - "fmt" "net/http" "net/url" "os" @@ -1280,7 +1279,7 @@ func (s *GoesTestSuite) TestUpdate(c *C) { } response, err = conn.Update(d, query, extraArgs) - fmt.Println(response) + if err != nil && strings.Contains(err.(*SearchError).Msg, "dynamic scripting") { c.Skip("Scripting is disabled on server, skipping this test") return From 3a66a8f6d3fdfad7998e690ac8485ddb49d58267 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 2 Feb 2017 19:33:06 -0600 Subject: [PATCH 22/23] Use the latest 1.x, 2.x, 5.x ES, build with Go 1.6 and 1.7 --- .travis.yml | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/.travis.yml b/.travis.yml index 2b494e8..123cbbc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,27 +1,31 @@ language: go +addons: + apt: + packages: + - oracle-java8-set-default + go: - - 1.5.4 - - 1.6.3 - - 1.7.1 + - 1.6.4 + - 1.7.5 env: global: - - GO15VENDOREXPERIMENT=1 + - JAVA_HOME=/usr/lib/jvm/java-8-oracle matrix: - - ES_VERSION=1.3.4 - - ES_VERSION=1.4.4 - - ES_VERSION=1.5.2 - - ES_VERSION=1.6.0 - - ES_VERSION=1.7.0 + - ES_VERSION=1.7.5 ES_URL=https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.5.tar.gz + - ES_VERSION=2.4.4 ES_URL=https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.4.4/elasticsearch-2.4.4.tar.gz + - ES_VERSION=5.2.0 ES_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.2.0.tar.gz before_script: + - java -version + - echo $JAVA_HOME - mkdir ${HOME}/elasticsearch - - wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz + - wget $ES_URL - tar -xzf elasticsearch-${ES_VERSION}.tar.gz -C ${HOME}/elasticsearch - - "echo 'script.groovy.sandbox.enabled: true' >> ${HOME}/elasticsearch/elasticsearch-${ES_VERSION}/config/elasticsearch.yml" - - ${HOME}/elasticsearch/elasticsearch-${ES_VERSION}/bin/elasticsearch >/dev/null & - - sleep 10 # Wait for ES to start up + - "echo 'script.inline: true' >> ${HOME}/elasticsearch/elasticsearch-${ES_VERSION}/config/elasticsearch.yml" + - ${HOME}/elasticsearch/elasticsearch-${ES_VERSION}/bin/elasticsearch & + - wget --retry-connrefused http://127.0.0.1:9200/ # Wait for ES to start up install: - go get github.com/Masterminds/glide From 4b222647b4a97b2f84aa590be071c7481394c225 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Fri, 3 Feb 2017 14:06:31 -0600 Subject: [PATCH 23/23] Add ability to do a request and just get the raw response bytes back --- goes.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/goes.go b/goes.go index f99976b..5ff3d1c 100644 --- a/goes.go +++ b/goes.go @@ -573,14 +573,28 @@ func (c *Client) AliasExists(alias string) (bool, error) { return resp.Status == 200, err } +func (c *Client) replaceHost(req *http.Request) { + req.URL.Scheme = "http" + req.URL.Host = fmt.Sprintf("%s:%s", c.Host, c.Port) +} + +// DoRaw Does the provided requeset and returns the raw bytes and the status code of the response +func (c *Client) DoRaw(r Requester) ([]byte, uint64, error) { + req, err := r.Request() + if err != nil { + return nil, 0, err + } + c.replaceHost(req) + return c.doRequest(req) +} + // Do runs the request returned by the requestor and returns the parsed response func (c *Client) Do(r Requester) (*Response, error) { req, err := r.Request() if err != nil { return &Response{}, err } - req.URL.Scheme = "http" - req.URL.Host = fmt.Sprintf("%s:%s", c.Host, c.Port) + c.replaceHost(req) body, statusCode, err := c.doRequest(req) esResp := &Response{Status: statusCode}