Make Client.Scan work with ES 5.x

This commit is contained in:
Paul Bonser 2017-02-02 13:04:45 -06:00
parent f9192a7ca8
commit 6492f3a5e3
2 changed files with 39 additions and 14 deletions

39
goes.go
View File

@ -286,10 +286,21 @@ func (c *Client) Query(query interface{}, indexList []string, typeList []string,
return c.Do(&r) return c.Do(&r)
} }
// Scan starts scroll over an index // Scan starts scroll over an index.
// For ES versions < 5.x, it uses search_type=scan; for 5.x it uses sort=_doc. This means that data
// will be returned in the initial response for 5.x versions, but not for older versions. Code
// wishing to be compatible with both should be written to handle either case.
func (c *Client) Scan(query interface{}, indexList []string, typeList []string, timeout string, size int) (*Response, error) { func (c *Client) Scan(query interface{}, indexList []string, typeList []string, timeout string, size int) (*Response, error) {
v := url.Values{} v := url.Values{}
v.Add("search_type", "scan") version, err := c.Version()
if err != nil {
return nil, err
}
if version > "5" {
v.Add("sort", "_doc")
} else {
v.Add("search_type", "scan")
}
v.Add("scroll", timeout) v.Add("scroll", timeout)
v.Add("size", strconv.Itoa(size)) v.Add("size", strconv.Itoa(size))
@ -307,14 +318,24 @@ func (c *Client) Scan(query interface{}, indexList []string, typeList []string,
// Scroll fetches data by scroll id // Scroll fetches data by scroll id
func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) { func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) {
v := url.Values{}
v.Add("scroll", timeout)
r := Request{ r := Request{
Method: "POST", Method: "POST",
API: "_search/scroll", API: "_search/scroll",
ExtraArgs: v, }
Body: []byte(scrollID),
if version, err := c.Version(); err != nil {
return nil, err
} else if version > "2" {
r.Body, err = json.Marshal(map[string]string{"scroll": timeout, "scroll_id": scrollID})
if err != nil {
return nil, err
}
} else {
v := url.Values{}
v.Add("scroll", timeout)
v.Add("scroll_id", scrollID)
r.ExtraArgs = v
} }
return c.Do(&r) return c.Do(&r)

View File

@ -930,7 +930,8 @@ func (s *GoesTestSuite) TestScroll(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
var query map[string]interface{} var query map[string]interface{}
if version, _ := conn.Version(); version > "5" { version, _ := conn.Version()
if version > "5" {
query = map[string]interface{}{ query = map[string]interface{}{
"query": map[string]interface{}{ "query": map[string]interface{}{
"bool": map[string]interface{}{ "bool": map[string]interface{}{
@ -956,12 +957,15 @@ func (s *GoesTestSuite) TestScroll(c *C) {
} }
} }
scan, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1) searchResults, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1)
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(len(scan.ScrollID) > 0, Equals, true) c.Assert(len(searchResults.ScrollID) > 0, Equals, true)
searchResults, err := conn.Scroll(scan.ScrollID, "1m") // Versions < 5.x don't include results in the initial response
c.Assert(err, IsNil) if version < "5" {
searchResults, err = conn.Scroll(searchResults.ScrollID, "1m")
c.Assert(err, IsNil)
}
// some data in first chunk // some data in first chunk
c.Assert(searchResults.Hits.Total, Equals, uint64(2)) c.Assert(searchResults.Hits.Total, Equals, uint64(2))