diff --git a/goes.go b/goes.go index 6a2a0ec..ea3ef74 100644 --- a/goes.go +++ b/goes.go @@ -13,6 +13,7 @@ import ( "io/ioutil" "net/http" "net/url" + "strconv" "strings" ) @@ -177,6 +178,42 @@ func (c *Connection) Search(query map[string]interface{}, indexList []string, ty return r.Run() } +// Scan starts scroll over an index +func (c *Connection) Scan(query map[string]interface{}, indexList []string, typeList []string, timeout string, size int) (Response, error) { + v := url.Values{} + v.Add("search_type", "scan") + v.Add("scroll", timeout) + v.Add("size", strconv.Itoa(size)) + + r := Request{ + Conn: c, + Query: query, + IndexList: indexList, + TypeList: typeList, + method: "POST", + api: "_search", + ExtraArgs: v, + } + + return r.Run() +} + +// Scroll fetches data by scroll id +func (c *Connection) Scroll(scrollId string, timeout string) (Response, error) { + v := url.Values{} + v.Add("scroll", timeout) + + r := Request{ + Conn: c, + method: "POST", + api: "_search/scroll", + ExtraArgs: v, + Body: []byte(scrollId), + } + + return r.Run() +} + // Get a typed document by its id func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) { r := Request{ @@ -233,7 +270,9 @@ func (req *Request) Run() (Response, error) { postData := []byte{} // XXX : refactor this - if req.api == "_bulk" { + if len(req.Body) > 0 { + postData = req.Body + } else if req.api == "_bulk" { postData = req.bulkData } else { b, err := json.Marshal(req.Query) diff --git a/goes_test.go b/goes_test.go index 075e4c0..8a1c8f8 100644 --- a/goes_test.go +++ b/goes_test.go @@ -623,3 +623,103 @@ func (s *GoesTestSuite) TestIndexStatus(c *C) { c.Assert(response.Indices, DeepEquals, expectedIndices) } + +func (s *GoesTestSuite) TestScroll(c *C) { + indexName := "testscroll" + docType := "tweet" + + tweets := []Document{ + Document{ + Id: nil, + Index: indexName, + Type: docType, + BulkCommand: BULK_COMMAND_INDEX, + Fields: map[string]interface{}{ + "user": "foo", + "message": "some foo message", + }, + }, + + Document{ + Id: nil, + Index: indexName, + Type: docType, + BulkCommand: BULK_COMMAND_INDEX, + Fields: map[string]interface{}{ + "user": "bar", + "message": "some bar message", + }, + }, + + Document{ + Id: nil, + Index: indexName, + Type: docType, + BulkCommand: BULK_COMMAND_INDEX, + Fields: map[string]interface{}{ + "user": "foo", + "message": "another foo message", + }, + }, + } + + conn := NewConnection(ES_HOST, ES_PORT) + + mapping := map[string]interface{}{ + "settings": map[string]interface{}{ + "index.number_of_shards": 1, + "index.number_of_replicas": 0, + }, + } + + defer conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, mapping) + c.Assert(err, IsNil) + + _, err = conn.BulkSend(indexName, tweets) + c.Assert(err, IsNil) + + _, err = conn.RefreshIndex(indexName) + c.Assert(err, IsNil) + + query := map[string]interface{}{ + "query": map[string]interface{}{ + "filtered": map[string]interface{}{ + "filter": map[string]interface{}{ + "term": map[string]interface{}{ + "user": "foo", + }, + }, + }, + }, + } + + scan, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1) + c.Assert(err, IsNil) + c.Assert(len(scan.ScrollId) > 0, Equals, true) + + searchResults, err := conn.Scroll(scan.ScrollId, "1m") + c.Assert(err, IsNil) + + // some data in first chunk + c.Assert(searchResults.Hits.Total, Equals, uint64(2)) + c.Assert(len(searchResults.ScrollId) > 0, Equals, true) + c.Assert(len(searchResults.Hits.Hits), Equals, 1) + + searchResults, err = conn.Scroll(searchResults.ScrollId, "1m") + c.Assert(err, IsNil) + + // more data in second chunk + c.Assert(searchResults.Hits.Total, Equals, uint64(2)) + c.Assert(len(searchResults.ScrollId) > 0, Equals, true) + c.Assert(len(searchResults.Hits.Hits), Equals, 1) + + searchResults, err = conn.Scroll(searchResults.ScrollId, "1m") + c.Assert(err, IsNil) + + // nothing in third chunk + c.Assert(searchResults.Hits.Total, Equals, uint64(2)) + c.Assert(len(searchResults.ScrollId) > 0, Equals, true) + c.Assert(len(searchResults.Hits.Hits), Equals, 0) +} diff --git a/structs.go b/structs.go index d2fba37..afab460 100644 --- a/structs.go +++ b/structs.go @@ -40,6 +40,9 @@ type Request struct { // Bulk data bulkData []byte + // Request body + Body []byte + // A list of extra URL arguments ExtraArgs url.Values @@ -76,6 +79,9 @@ type Response struct { // Used by the _status API Indices map[string]IndexStatus + + // Scroll id for iteration + ScrollId string `json:"_scroll_id"` } // Represents a document to send to elasticsearch