Added initial scroll support
This commit is contained in:
parent
4f4a9961b2
commit
5356d04035
41
goes.go
41
goes.go
@ -13,6 +13,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -177,6 +178,42 @@ func (c *Connection) Search(query map[string]interface{}, indexList []string, ty
|
|||||||
return r.Run()
|
return r.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Scan starts scroll over an index
|
||||||
|
func (c *Connection) Scan(query map[string]interface{}, indexList []string, typeList []string, timeout string, size int) (Response, error) {
|
||||||
|
v := url.Values{}
|
||||||
|
v.Add("search_type", "scan")
|
||||||
|
v.Add("scroll", timeout)
|
||||||
|
v.Add("size", strconv.Itoa(size))
|
||||||
|
|
||||||
|
r := Request{
|
||||||
|
Conn: c,
|
||||||
|
Query: query,
|
||||||
|
IndexList: indexList,
|
||||||
|
TypeList: typeList,
|
||||||
|
method: "POST",
|
||||||
|
api: "_search",
|
||||||
|
ExtraArgs: v,
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Run()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scroll fetches data by scroll id
|
||||||
|
func (c *Connection) Scroll(scrollId string, timeout string) (Response, error) {
|
||||||
|
v := url.Values{}
|
||||||
|
v.Add("scroll", timeout)
|
||||||
|
|
||||||
|
r := Request{
|
||||||
|
Conn: c,
|
||||||
|
method: "POST",
|
||||||
|
api: "_search/scroll",
|
||||||
|
ExtraArgs: v,
|
||||||
|
Body: []byte(scrollId),
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Run()
|
||||||
|
}
|
||||||
|
|
||||||
// Get a typed document by its id
|
// Get a typed document by its id
|
||||||
func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) {
|
func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) {
|
||||||
r := Request{
|
r := Request{
|
||||||
@ -233,7 +270,9 @@ func (req *Request) Run() (Response, error) {
|
|||||||
postData := []byte{}
|
postData := []byte{}
|
||||||
|
|
||||||
// XXX : refactor this
|
// XXX : refactor this
|
||||||
if req.api == "_bulk" {
|
if len(req.Body) > 0 {
|
||||||
|
postData = req.Body
|
||||||
|
} else if req.api == "_bulk" {
|
||||||
postData = req.bulkData
|
postData = req.bulkData
|
||||||
} else {
|
} else {
|
||||||
b, err := json.Marshal(req.Query)
|
b, err := json.Marshal(req.Query)
|
||||||
|
100
goes_test.go
100
goes_test.go
@ -623,3 +623,103 @@ func (s *GoesTestSuite) TestIndexStatus(c *C) {
|
|||||||
|
|
||||||
c.Assert(response.Indices, DeepEquals, expectedIndices)
|
c.Assert(response.Indices, DeepEquals, expectedIndices)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *GoesTestSuite) TestScroll(c *C) {
|
||||||
|
indexName := "testscroll"
|
||||||
|
docType := "tweet"
|
||||||
|
|
||||||
|
tweets := []Document{
|
||||||
|
Document{
|
||||||
|
Id: nil,
|
||||||
|
Index: indexName,
|
||||||
|
Type: docType,
|
||||||
|
BulkCommand: BULK_COMMAND_INDEX,
|
||||||
|
Fields: map[string]interface{}{
|
||||||
|
"user": "foo",
|
||||||
|
"message": "some foo message",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
Document{
|
||||||
|
Id: nil,
|
||||||
|
Index: indexName,
|
||||||
|
Type: docType,
|
||||||
|
BulkCommand: BULK_COMMAND_INDEX,
|
||||||
|
Fields: map[string]interface{}{
|
||||||
|
"user": "bar",
|
||||||
|
"message": "some bar message",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
Document{
|
||||||
|
Id: nil,
|
||||||
|
Index: indexName,
|
||||||
|
Type: docType,
|
||||||
|
BulkCommand: BULK_COMMAND_INDEX,
|
||||||
|
Fields: map[string]interface{}{
|
||||||
|
"user": "foo",
|
||||||
|
"message": "another foo message",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
conn := NewConnection(ES_HOST, ES_PORT)
|
||||||
|
|
||||||
|
mapping := map[string]interface{}{
|
||||||
|
"settings": map[string]interface{}{
|
||||||
|
"index.number_of_shards": 1,
|
||||||
|
"index.number_of_replicas": 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
defer conn.DeleteIndex(indexName)
|
||||||
|
|
||||||
|
_, err := conn.CreateIndex(indexName, mapping)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
_, err = conn.BulkSend(indexName, tweets)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
_, err = conn.RefreshIndex(indexName)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
query := map[string]interface{}{
|
||||||
|
"query": map[string]interface{}{
|
||||||
|
"filtered": map[string]interface{}{
|
||||||
|
"filter": map[string]interface{}{
|
||||||
|
"term": map[string]interface{}{
|
||||||
|
"user": "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
scan, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(len(scan.ScrollId) > 0, Equals, true)
|
||||||
|
|
||||||
|
searchResults, err := conn.Scroll(scan.ScrollId, "1m")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
// some data in first chunk
|
||||||
|
c.Assert(searchResults.Hits.Total, Equals, uint64(2))
|
||||||
|
c.Assert(len(searchResults.ScrollId) > 0, Equals, true)
|
||||||
|
c.Assert(len(searchResults.Hits.Hits), Equals, 1)
|
||||||
|
|
||||||
|
searchResults, err = conn.Scroll(searchResults.ScrollId, "1m")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
// more data in second chunk
|
||||||
|
c.Assert(searchResults.Hits.Total, Equals, uint64(2))
|
||||||
|
c.Assert(len(searchResults.ScrollId) > 0, Equals, true)
|
||||||
|
c.Assert(len(searchResults.Hits.Hits), Equals, 1)
|
||||||
|
|
||||||
|
searchResults, err = conn.Scroll(searchResults.ScrollId, "1m")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
// nothing in third chunk
|
||||||
|
c.Assert(searchResults.Hits.Total, Equals, uint64(2))
|
||||||
|
c.Assert(len(searchResults.ScrollId) > 0, Equals, true)
|
||||||
|
c.Assert(len(searchResults.Hits.Hits), Equals, 0)
|
||||||
|
}
|
||||||
|
@ -40,6 +40,9 @@ type Request struct {
|
|||||||
// Bulk data
|
// Bulk data
|
||||||
bulkData []byte
|
bulkData []byte
|
||||||
|
|
||||||
|
// Request body
|
||||||
|
Body []byte
|
||||||
|
|
||||||
// A list of extra URL arguments
|
// A list of extra URL arguments
|
||||||
ExtraArgs url.Values
|
ExtraArgs url.Values
|
||||||
|
|
||||||
@ -76,6 +79,9 @@ type Response struct {
|
|||||||
|
|
||||||
// Used by the _status API
|
// Used by the _status API
|
||||||
Indices map[string]IndexStatus
|
Indices map[string]IndexStatus
|
||||||
|
|
||||||
|
// Scroll id for iteration
|
||||||
|
ScrollId string `json:"_scroll_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Represents a document to send to elasticsearch
|
// Represents a document to send to elasticsearch
|
||||||
|
Loading…
Reference in New Issue
Block a user