Merge pull request #3 from bobrik/scroll

Scrolling support
This commit is contained in:
Jérôme Renard 2014-01-11 10:12:04 -08:00
commit caac0b3f53
3 changed files with 146 additions and 1 deletions

41
goes.go
View File

@ -13,6 +13,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
)
@ -177,6 +178,42 @@ func (c *Connection) Search(query map[string]interface{}, indexList []string, ty
return r.Run()
}
// Scan starts scroll over an index
func (c *Connection) Scan(query map[string]interface{}, indexList []string, typeList []string, timeout string, size int) (Response, error) {
v := url.Values{}
v.Add("search_type", "scan")
v.Add("scroll", timeout)
v.Add("size", strconv.Itoa(size))
r := Request{
Conn: c,
Query: query,
IndexList: indexList,
TypeList: typeList,
method: "POST",
api: "_search",
ExtraArgs: v,
}
return r.Run()
}
// Scroll fetches data by scroll id
func (c *Connection) Scroll(scrollId string, timeout string) (Response, error) {
v := url.Values{}
v.Add("scroll", timeout)
r := Request{
Conn: c,
method: "POST",
api: "_search/scroll",
ExtraArgs: v,
Body: []byte(scrollId),
}
return r.Run()
}
// Get a typed document by its id
func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) {
r := Request{
@ -233,7 +270,9 @@ func (req *Request) Run() (Response, error) {
postData := []byte{}
// XXX : refactor this
if req.api == "_bulk" {
if len(req.Body) > 0 {
postData = req.Body
} else if req.api == "_bulk" {
postData = req.bulkData
} else {
b, err := json.Marshal(req.Query)

View File

@ -623,3 +623,103 @@ func (s *GoesTestSuite) TestIndexStatus(c *C) {
c.Assert(response.Indices, DeepEquals, expectedIndices)
}
func (s *GoesTestSuite) TestScroll(c *C) {
indexName := "testscroll"
docType := "tweet"
tweets := []Document{
Document{
Id: nil,
Index: indexName,
Type: docType,
BulkCommand: BULK_COMMAND_INDEX,
Fields: map[string]interface{}{
"user": "foo",
"message": "some foo message",
},
},
Document{
Id: nil,
Index: indexName,
Type: docType,
BulkCommand: BULK_COMMAND_INDEX,
Fields: map[string]interface{}{
"user": "bar",
"message": "some bar message",
},
},
Document{
Id: nil,
Index: indexName,
Type: docType,
BulkCommand: BULK_COMMAND_INDEX,
Fields: map[string]interface{}{
"user": "foo",
"message": "another foo message",
},
},
}
conn := NewConnection(ES_HOST, ES_PORT)
mapping := map[string]interface{}{
"settings": map[string]interface{}{
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
},
}
defer conn.DeleteIndex(indexName)
_, err := conn.CreateIndex(indexName, mapping)
c.Assert(err, IsNil)
_, err = conn.BulkSend(indexName, tweets)
c.Assert(err, IsNil)
_, err = conn.RefreshIndex(indexName)
c.Assert(err, IsNil)
query := map[string]interface{}{
"query": map[string]interface{}{
"filtered": map[string]interface{}{
"filter": map[string]interface{}{
"term": map[string]interface{}{
"user": "foo",
},
},
},
},
}
scan, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1)
c.Assert(err, IsNil)
c.Assert(len(scan.ScrollId) > 0, Equals, true)
searchResults, err := conn.Scroll(scan.ScrollId, "1m")
c.Assert(err, IsNil)
// some data in first chunk
c.Assert(searchResults.Hits.Total, Equals, uint64(2))
c.Assert(len(searchResults.ScrollId) > 0, Equals, true)
c.Assert(len(searchResults.Hits.Hits), Equals, 1)
searchResults, err = conn.Scroll(searchResults.ScrollId, "1m")
c.Assert(err, IsNil)
// more data in second chunk
c.Assert(searchResults.Hits.Total, Equals, uint64(2))
c.Assert(len(searchResults.ScrollId) > 0, Equals, true)
c.Assert(len(searchResults.Hits.Hits), Equals, 1)
searchResults, err = conn.Scroll(searchResults.ScrollId, "1m")
c.Assert(err, IsNil)
// nothing in third chunk
c.Assert(searchResults.Hits.Total, Equals, uint64(2))
c.Assert(len(searchResults.ScrollId) > 0, Equals, true)
c.Assert(len(searchResults.Hits.Hits), Equals, 0)
}

View File

@ -40,6 +40,9 @@ type Request struct {
// Bulk data
bulkData []byte
// Request body
Body []byte
// A list of extra URL arguments
ExtraArgs url.Values
@ -76,6 +79,9 @@ type Response struct {
// Used by the _status API
Indices map[string]IndexStatus
// Scroll id for iteration
ScrollId string `json:"_scroll_id"`
}
// Represents a document to send to elasticsearch