Merge pull request #19 from marinbek/master
Support for PutMapping, Update and IndicesExist APIs
This commit is contained in:
commit
63cef24bc9
1
.gitignore
vendored
1
.gitignore
vendored
@ -1 +1,2 @@
|
||||
*.test
|
||||
*.swp
|
||||
|
71
goes.go
71
goes.go
@ -345,9 +345,14 @@ func (req *Request) Run() (Response, error) {
|
||||
}
|
||||
|
||||
esResp := new(Response)
|
||||
err = json.Unmarshal(body, &esResp)
|
||||
if err != nil {
|
||||
return Response{}, err
|
||||
if req.method == "HEAD" {
|
||||
esResp.Status = uint64(resp.StatusCode)
|
||||
} else {
|
||||
err = json.Unmarshal(body, &esResp)
|
||||
if err != nil {
|
||||
return Response{}, err
|
||||
}
|
||||
json.Unmarshal(body, &esResp.Raw)
|
||||
}
|
||||
|
||||
if req.api == "_bulk" && esResp.Errors {
|
||||
@ -377,7 +382,7 @@ func (r *Request) Url() string {
|
||||
}
|
||||
|
||||
// XXX : for indexing documents using the normal (non bulk) API
|
||||
if len(r.api) == 0 && len(r.id) > 0 {
|
||||
if len(r.id) > 0 {
|
||||
path += "/" + r.id
|
||||
}
|
||||
|
||||
@ -423,3 +428,61 @@ func (b Bucket) Aggregation(name string) Aggregation {
|
||||
return Aggregation{}
|
||||
}
|
||||
}
|
||||
|
||||
// PutMapping registers a specific mapping for one or more types in one or more indexes
|
||||
func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, indexes []string) (Response, error) {
|
||||
|
||||
r := Request{
|
||||
Conn: c,
|
||||
Query: mapping,
|
||||
IndexList: indexes,
|
||||
method: "PUT",
|
||||
api: "_mappings/" + typeName,
|
||||
}
|
||||
|
||||
return r.Run()
|
||||
}
|
||||
|
||||
func (c *Connection) GetMapping(types []string, indexes []string) (Response, error) {
|
||||
|
||||
r := Request{
|
||||
Conn: c,
|
||||
IndexList: indexes,
|
||||
method: "GET",
|
||||
api: "_mapping/" + strings.Join(types, ","),
|
||||
}
|
||||
|
||||
return r.Run()
|
||||
}
|
||||
|
||||
// IndicesExist checks whether index (or indices) exist on the server
|
||||
func (c *Connection) IndicesExist(indexes []string) (bool, error) {
|
||||
|
||||
r := Request{
|
||||
Conn: c,
|
||||
IndexList: indexes,
|
||||
method: "HEAD",
|
||||
}
|
||||
|
||||
resp, err := r.Run()
|
||||
|
||||
return resp.Status == 200, err
|
||||
}
|
||||
|
||||
func (c *Connection) Update(d Document, query map[string]interface{}, extraArgs url.Values) (Response, error) {
|
||||
r := Request{
|
||||
Conn: c,
|
||||
Query: query,
|
||||
IndexList: []string{d.Index.(string)},
|
||||
TypeList: []string{d.Type},
|
||||
ExtraArgs: extraArgs,
|
||||
method: "POST",
|
||||
api: "_update",
|
||||
}
|
||||
|
||||
if d.Id != nil {
|
||||
r.id = d.Id.(string)
|
||||
}
|
||||
|
||||
return r.Run()
|
||||
}
|
||||
|
190
goes_test.go
190
goes_test.go
@ -9,6 +9,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@ -171,6 +172,7 @@ func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) {
|
||||
|
||||
expectedResponse := Response{}
|
||||
expectedResponse.Acknowledged = true
|
||||
resp.Raw = nil
|
||||
c.Assert(resp, DeepEquals, expectedResponse)
|
||||
}
|
||||
|
||||
@ -361,6 +363,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) {
|
||||
Version: 1,
|
||||
}
|
||||
|
||||
response.Raw = nil
|
||||
c.Assert(response, DeepEquals, expectedResponse)
|
||||
}
|
||||
|
||||
@ -425,6 +428,7 @@ func (s *GoesTestSuite) TestIndexIdDefined(c *C) {
|
||||
Version: 1,
|
||||
}
|
||||
|
||||
response.Raw = nil
|
||||
c.Assert(response, DeepEquals, expectedResponse)
|
||||
}
|
||||
|
||||
@ -494,6 +498,7 @@ func (s *GoesTestSuite) TestDelete(c *C) {
|
||||
// XXX : even after a DELETE the version number seems to be incremented
|
||||
Version: 2,
|
||||
}
|
||||
response.Raw = nil
|
||||
c.Assert(response, DeepEquals, expectedResponse)
|
||||
|
||||
response, err = conn.Delete(d, url.Values{})
|
||||
@ -507,6 +512,7 @@ func (s *GoesTestSuite) TestDelete(c *C) {
|
||||
// XXX : even after a DELETE the version number seems to be incremented
|
||||
Version: 3,
|
||||
}
|
||||
response.Raw = nil
|
||||
c.Assert(response, DeepEquals, expectedResponse)
|
||||
}
|
||||
|
||||
@ -566,6 +572,7 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) {
|
||||
Id: "",
|
||||
Version: 0,
|
||||
}
|
||||
response.Raw = nil
|
||||
c.Assert(response, DeepEquals, expectedResponse)
|
||||
|
||||
//should be 0 docs after delete by query
|
||||
@ -612,6 +619,7 @@ func (s *GoesTestSuite) TestGet(c *C) {
|
||||
Source: source,
|
||||
}
|
||||
|
||||
response.Raw = nil
|
||||
c.Assert(response, DeepEquals, expectedResponse)
|
||||
|
||||
fields := make(url.Values, 1)
|
||||
@ -630,6 +638,7 @@ func (s *GoesTestSuite) TestGet(c *C) {
|
||||
},
|
||||
}
|
||||
|
||||
response.Raw = nil
|
||||
c.Assert(response, DeepEquals, expectedResponse)
|
||||
}
|
||||
|
||||
@ -970,3 +979,184 @@ func (s *GoesTestSuite) TestAggregations(c *C) {
|
||||
c.Assert(age["count"], Equals, 2.0)
|
||||
c.Assert(age["sum"], Equals, 25.0+30.0)
|
||||
}
|
||||
|
||||
func (s *GoesTestSuite) TestPutMapping(c *C) {
|
||||
indexName := "testputmapping"
|
||||
docType := "tweet"
|
||||
|
||||
conn := NewConnection(ES_HOST, ES_PORT)
|
||||
// just in case
|
||||
conn.DeleteIndex(indexName)
|
||||
|
||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{})
|
||||
c.Assert(err, IsNil)
|
||||
defer conn.DeleteIndex(indexName)
|
||||
|
||||
d := Document{
|
||||
Index: indexName,
|
||||
Type: docType,
|
||||
Fields: map[string]interface{}{
|
||||
"user": "foo",
|
||||
"message": "bar",
|
||||
},
|
||||
}
|
||||
|
||||
response, err := conn.Index(d, url.Values{})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
mapping := map[string]interface{}{
|
||||
"tweet": map[string]interface{}{
|
||||
"properties": map[string]interface{}{
|
||||
"count": map[string]interface{}{
|
||||
"type": "integer",
|
||||
"index": "not_analyzed",
|
||||
"store": true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
response, err = conn.PutMapping("tweet", mapping, []string{indexName})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
c.Assert(response.Acknowledged, Equals, true)
|
||||
c.Assert(response.TimedOut, Equals, false)
|
||||
}
|
||||
|
||||
func (s *GoesTestSuite) TestIndicesExist(c *C) {
|
||||
indices := []string{"testindicesexist"}
|
||||
|
||||
conn := NewConnection(ES_HOST, ES_PORT)
|
||||
// just in case
|
||||
conn.DeleteIndex(indices[0])
|
||||
|
||||
exists, err := conn.IndicesExist(indices)
|
||||
c.Assert(exists, Equals, false)
|
||||
|
||||
_, err = conn.CreateIndex(indices[0], map[string]interface{}{})
|
||||
c.Assert(err, IsNil)
|
||||
defer conn.DeleteIndex(indices[0])
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
exists, err = conn.IndicesExist(indices)
|
||||
c.Assert(exists, Equals, true)
|
||||
|
||||
indices = append(indices, "nonexistent")
|
||||
exists, err = conn.IndicesExist(indices)
|
||||
c.Assert(exists, Equals, false)
|
||||
}
|
||||
|
||||
func (s *GoesTestSuite) TestUpdate(c *C) {
|
||||
indexName := "testupdate"
|
||||
docType := "tweet"
|
||||
docId := "1234"
|
||||
|
||||
conn := NewConnection(ES_HOST, ES_PORT)
|
||||
// just in case
|
||||
conn.DeleteIndex(indexName)
|
||||
|
||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{})
|
||||
c.Assert(err, IsNil)
|
||||
defer conn.DeleteIndex(indexName)
|
||||
|
||||
d := Document{
|
||||
Index: indexName,
|
||||
Type: docType,
|
||||
Id: docId,
|
||||
Fields: map[string]interface{}{
|
||||
"user": "foo",
|
||||
"message": "bar",
|
||||
"counter": 1,
|
||||
},
|
||||
}
|
||||
|
||||
extraArgs := make(url.Values, 1)
|
||||
response, err := conn.Index(d, extraArgs)
|
||||
c.Assert(err, IsNil)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
expectedResponse := Response{
|
||||
Index: indexName,
|
||||
Id: docId,
|
||||
Type: docType,
|
||||
Version: 1,
|
||||
}
|
||||
|
||||
response.Raw = nil
|
||||
c.Assert(response, DeepEquals, expectedResponse)
|
||||
|
||||
// Now that we have an ordinary document indexed, try updating it
|
||||
query := map[string]interface{}{
|
||||
"script": "ctx._source.counter += count",
|
||||
"params": map[string]interface{}{
|
||||
"count": 5,
|
||||
},
|
||||
"upsert": map[string]interface{}{
|
||||
"message": "candybar",
|
||||
"user": "admin",
|
||||
"counter": 1,
|
||||
},
|
||||
}
|
||||
|
||||
response, err = conn.Update(d, query, extraArgs)
|
||||
if err != nil && strings.Contains(err.(*SearchError).Msg, "dynamic scripting disabled") {
|
||||
c.Skip("Scripting is disabled on server, skipping this test")
|
||||
return
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
c.Assert(err, Equals, nil)
|
||||
|
||||
response, err = conn.Get(indexName, docType, docId, url.Values{})
|
||||
c.Assert(err, Equals, nil)
|
||||
c.Assert(response.Source["counter"], Equals, float64(6))
|
||||
c.Assert(response.Source["user"], Equals, "foo")
|
||||
c.Assert(response.Source["message"], Equals, "bar")
|
||||
|
||||
// Test another document, non-existent
|
||||
docId = "555"
|
||||
d.Id = docId
|
||||
response, err = conn.Update(d, query, extraArgs)
|
||||
c.Assert(err, Equals, nil)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
response, err = conn.Get(indexName, docType, docId, url.Values{})
|
||||
c.Assert(err, Equals, nil)
|
||||
c.Assert(response.Source["user"], Equals, "admin")
|
||||
c.Assert(response.Source["message"], Equals, "candybar")
|
||||
}
|
||||
|
||||
func (s *GoesTestSuite) TestGetMapping(c *C) {
|
||||
indexName := "testmapping"
|
||||
docType := "tweet"
|
||||
|
||||
conn := NewConnection(ES_HOST, ES_PORT)
|
||||
// just in case
|
||||
conn.DeleteIndex(indexName)
|
||||
|
||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{})
|
||||
c.Assert(err, IsNil)
|
||||
defer conn.DeleteIndex(indexName)
|
||||
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
response, err := conn.GetMapping([]string{docType}, []string{indexName})
|
||||
c.Assert(err, Equals, nil)
|
||||
c.Assert(len(response.Raw), Equals, 0)
|
||||
|
||||
d := Document{
|
||||
Index: indexName,
|
||||
Type: docType,
|
||||
Fields: map[string]interface{}{
|
||||
"user": "foo",
|
||||
"message": "bar",
|
||||
},
|
||||
}
|
||||
|
||||
response, err = conn.Index(d, url.Values{})
|
||||
c.Assert(err, IsNil)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
response, err = conn.GetMapping([]string{docType}, []string{indexName})
|
||||
c.Assert(err, Equals, nil)
|
||||
c.Assert(len(response.Raw), Not(Equals), 0)
|
||||
}
|
||||
|
@ -88,6 +88,8 @@ type Response struct {
|
||||
ScrollId string `json:"_scroll_id"`
|
||||
|
||||
Aggregations map[string]Aggregation `json:"aggregations,omitempty"`
|
||||
|
||||
Raw map[string]interface{}
|
||||
}
|
||||
|
||||
// Represents an aggregation from response
|
||||
|
Loading…
Reference in New Issue
Block a user