From f8e5a2a4334255fc9aed045d6bea23e13b693727 Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Tue, 11 Nov 2014 14:37:34 +0100 Subject: [PATCH 01/13] Support for PutMapping API --- goes.go | 14 ++++++++++++++ goes_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/goes.go b/goes.go index 4439b1a..52c4b8f 100644 --- a/goes.go +++ b/goes.go @@ -423,3 +423,17 @@ func (b Bucket) Aggregation(name string) Aggregation { return Aggregation{} } } + +// PutMapping registers a specific mapping for one or more types in one or more indexes +func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, indexes ...string) (Response, error) { + + r := Request{ + Conn: c, + Query: mapping, + IndexList: indexes, + method: "PUT", + api: "_mappings/" + typeName, + } + + return r.Run() +} diff --git a/goes_test.go b/goes_test.go index 49914e8..e223b54 100644 --- a/goes_test.go +++ b/goes_test.go @@ -970,3 +970,45 @@ func (s *GoesTestSuite) TestAggregations(c *C) { c.Assert(age["count"], Equals, 2.0) c.Assert(age["sum"], Equals, 25.0+30.0) } + +func (s *GoesTestSuite) TestPutMapping(c *C) { + indexName := "testputmapping" + docType := "tweet" + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + d := Document{ + Index: indexName, + Type: docType, + Fields: map[string]interface{}{ + "user": "foo", + "message": "bar", + }, + } + + response, err := conn.Index(d, url.Values{}) + c.Assert(err, IsNil) + + mapping := map[string]interface{}{ + "tweet": map[string]interface{}{ + "properties": map[string]interface{}{ + "count": map[string]interface{}{ + "type": "integer", + "index": "not_analyzed", + "store": true, + }, + }, + }, + } + response, err = conn.PutMapping("tweet", mapping, indexName) + c.Assert(err, IsNil) + + c.Assert(response.Acknowledged, Equals, true) + c.Assert(response.TimedOut, Equals, false) +} From e5d37bf9452e0894bb4ad1bd084420ffecc79c03 Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Tue, 11 Nov 2014 15:37:07 +0100 Subject: [PATCH 02/13] HEAD requests have no body, only HTTP response code, so we need to act accordingly --- goes.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/goes.go b/goes.go index 52c4b8f..beb135e 100644 --- a/goes.go +++ b/goes.go @@ -345,10 +345,14 @@ func (req *Request) Run() (Response, error) { } esResp := new(Response) - err = json.Unmarshal(body, &esResp) - if err != nil { - return Response{}, err - } + if req.method == "HEAD" { + esResp.Status = uint64(resp.StatusCode) + } else { + err = json.Unmarshal(body, &esResp) + if err != nil { + return Response{}, err + } + } if req.api == "_bulk" && esResp.Errors { for _, item := range esResp.Items { From 92d2eb17fa1d057286b80f2c21e8f23d3b575318 Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Tue, 11 Nov 2014 15:37:50 +0100 Subject: [PATCH 03/13] Test IndicesExist --- goes_test.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/goes_test.go b/goes_test.go index e223b54..23d0b43 100644 --- a/goes_test.go +++ b/goes_test.go @@ -1012,3 +1012,24 @@ func (s *GoesTestSuite) TestPutMapping(c *C) { c.Assert(response.Acknowledged, Equals, true) c.Assert(response.TimedOut, Equals, false) } + +func (s *GoesTestSuite) TestIndicesExist(c *C) { + indexName := "testindicesexist" + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indexName) + + exists, err := conn.IndicesExist(indexName) + c.Assert(exists, Equals, false) + + _, err = conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + exists, err = conn.IndicesExist(indexName) + c.Assert(exists, Equals, true) + + exists, err = conn.IndicesExist(indexName, "nonexistent") + c.Assert(exists, Equals, false) +} From ebf3324c9029b210e264b0cb41f0185f926452fa Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Tue, 11 Nov 2014 15:39:07 +0100 Subject: [PATCH 04/13] IndicesExist implementation --- goes.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/goes.go b/goes.go index beb135e..1cf3b38 100644 --- a/goes.go +++ b/goes.go @@ -441,3 +441,17 @@ func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, return r.Run() } + +// IndicesExist checks whether index (or indices) exist on the server +func (c *Connection) IndicesExist(indexes ...string) (bool, error) { + + r := Request{ + Conn: c, + IndexList: indexes, + method: "HEAD", + } + + resp, err := r.Run() + + return resp.Status == 200, err +} From fd03304d0bd62a453c80dcad9173473a404a46a8 Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Tue, 11 Nov 2014 15:39:36 +0100 Subject: [PATCH 05/13] go fmt --- goes.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/goes.go b/goes.go index 1cf3b38..b162306 100644 --- a/goes.go +++ b/goes.go @@ -345,14 +345,14 @@ func (req *Request) Run() (Response, error) { } esResp := new(Response) - if req.method == "HEAD" { - esResp.Status = uint64(resp.StatusCode) - } else { - err = json.Unmarshal(body, &esResp) - if err != nil { - return Response{}, err - } - } + if req.method == "HEAD" { + esResp.Status = uint64(resp.StatusCode) + } else { + err = json.Unmarshal(body, &esResp) + if err != nil { + return Response{}, err + } + } if req.api == "_bulk" && esResp.Errors { for _, item := range esResp.Items { From 8c049cb79c0d161c9e9bf630c4549f74af5c6314 Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Tue, 11 Nov 2014 15:39:58 +0100 Subject: [PATCH 06/13] ignore vim files --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 9ed3b07..6788dd3 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.test +*.swp From 76ad99bb81a085878d16343a2e2b1fcfec1fda32 Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Wed, 12 Nov 2014 17:34:58 +0100 Subject: [PATCH 07/13] Update API implementation --- goes.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/goes.go b/goes.go index b162306..d5dfbab 100644 --- a/goes.go +++ b/goes.go @@ -381,7 +381,7 @@ func (r *Request) Url() string { } // XXX : for indexing documents using the normal (non bulk) API - if len(r.api) == 0 && len(r.id) > 0 { + if len(r.id) > 0 { path += "/" + r.id } @@ -455,3 +455,18 @@ func (c *Connection) IndicesExist(indexes ...string) (bool, error) { return resp.Status == 200, err } + +func (c *Connection) Update(d Document, query map[string]interface{}, extraArgs url.Values) (Response, error) { + r := Request{ + Conn: c, + Query: query, + IndexList: []string{d.Index.(string)}, + TypeList: []string{d.Type}, + ExtraArgs: extraArgs, + method: "POST", + api: "_update", + id: d.Id.(string), + } + + return r.Run() +} From 36f0db102e25b34dd12f3f3fa5dccd2601a86e62 Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Wed, 12 Nov 2014 17:35:37 +0100 Subject: [PATCH 08/13] Update test --- goes_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/goes_test.go b/goes_test.go index 23d0b43..206c157 100644 --- a/goes_test.go +++ b/goes_test.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "os" + "strings" "testing" "time" ) @@ -1033,3 +1034,79 @@ func (s *GoesTestSuite) TestIndicesExist(c *C) { exists, err = conn.IndicesExist(indexName, "nonexistent") c.Assert(exists, Equals, false) } + +func (s *GoesTestSuite) TestUpdate(c *C) { + indexName := "testupdate" + docType := "tweet" + docId := "1234" + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + d := Document{ + Index: indexName, + Type: docType, + Id: docId, + Fields: map[string]interface{}{ + "user": "foo", + "message": "bar", + "counter": 1, + }, + } + + extraArgs := make(url.Values, 1) + response, err := conn.Index(d, extraArgs) + c.Assert(err, IsNil) + + expectedResponse := Response{ + Index: indexName, + Id: docId, + Type: docType, + Version: 1, + } + + c.Assert(response, DeepEquals, expectedResponse) + + // Now that we have an ordinary document indexed, try updating it + query := map[string]interface{}{ + "script": "ctx._source.counter += count", + "params": map[string]interface{}{ + "count": 5, + }, + "upsert": map[string]interface{}{ + "message": "candybar", + "user": "admin", + "counter": 1, + }, + } + + response, err = conn.Update(d, query, extraArgs) + if err != nil && strings.Contains(err.(*SearchError).Msg, "dynamic scripting disabled") { + c.Skip("Scripting is disabled on server, skipping this test") + return + } + + c.Assert(err, Equals, nil) + + response, err = conn.Get(indexName, docType, docId, url.Values{}) + c.Assert(err, Equals, nil) + c.Assert(response.Source["counter"], Equals, float64(6)) + c.Assert(response.Source["user"], Equals, "foo") + c.Assert(response.Source["message"], Equals, "bar") + + // Test another document, non-existent + docId = "555" + d.Id = docId + response, err = conn.Update(d, query, extraArgs) + c.Assert(err, Equals, nil) + + response, err = conn.Get(indexName, docType, docId, url.Values{}) + c.Assert(err, Equals, nil) + c.Assert(response.Source["user"], Equals, "admin") + c.Assert(response.Source["message"], Equals, "candybar") +} From 589282a1895d2a2592acc8684a46e34133b1c88c Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Thu, 13 Nov 2014 09:21:27 +0100 Subject: [PATCH 09/13] Unmarshal whole response into Raw field on Response for APIs that return fully dynamic format, like _mapping --- goes.go | 1 + goes_test.go | 9 +++++++++ structs.go | 2 ++ 3 files changed, 12 insertions(+) diff --git a/goes.go b/goes.go index d5dfbab..84d171a 100644 --- a/goes.go +++ b/goes.go @@ -352,6 +352,7 @@ func (req *Request) Run() (Response, error) { if err != nil { return Response{}, err } + json.Unmarshal(body, &esResp.Raw) } if req.api == "_bulk" && esResp.Errors { diff --git a/goes_test.go b/goes_test.go index 206c157..9a33f27 100644 --- a/goes_test.go +++ b/goes_test.go @@ -172,6 +172,7 @@ func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) { expectedResponse := Response{} expectedResponse.Acknowledged = true + resp.Raw = nil c.Assert(resp, DeepEquals, expectedResponse) } @@ -362,6 +363,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) { Version: 1, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) } @@ -426,6 +428,7 @@ func (s *GoesTestSuite) TestIndexIdDefined(c *C) { Version: 1, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) } @@ -495,6 +498,7 @@ func (s *GoesTestSuite) TestDelete(c *C) { // XXX : even after a DELETE the version number seems to be incremented Version: 2, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) response, err = conn.Delete(d, url.Values{}) @@ -508,6 +512,7 @@ func (s *GoesTestSuite) TestDelete(c *C) { // XXX : even after a DELETE the version number seems to be incremented Version: 3, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) } @@ -567,6 +572,7 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { Id: "", Version: 0, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) //should be 0 docs after delete by query @@ -613,6 +619,7 @@ func (s *GoesTestSuite) TestGet(c *C) { Source: source, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) fields := make(url.Values, 1) @@ -631,6 +638,7 @@ func (s *GoesTestSuite) TestGet(c *C) { }, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) } @@ -1070,6 +1078,7 @@ func (s *GoesTestSuite) TestUpdate(c *C) { Version: 1, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) // Now that we have an ordinary document indexed, try updating it diff --git a/structs.go b/structs.go index cfd043b..e397a9e 100644 --- a/structs.go +++ b/structs.go @@ -88,6 +88,8 @@ type Response struct { ScrollId string `json:"_scroll_id"` Aggregations map[string]Aggregation `json:"aggregations,omitempty"` + + Raw map[string]interface{} } // Represents an aggregation from response From 08d26b69e0df37500f7c83ba0cc495c91530d30d Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Thu, 13 Nov 2014 09:22:11 +0100 Subject: [PATCH 10/13] GetMapping implementation --- goes.go | 12 ++++++++++++ goes_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/goes.go b/goes.go index 84d171a..9216818 100644 --- a/goes.go +++ b/goes.go @@ -443,6 +443,18 @@ func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, return r.Run() } +func (c *Connection) GetMapping(types []string, indexes ...string) (Response, error) { + + r := Request{ + Conn: c, + IndexList: indexes, + method: "GET", + api: "_mapping/" + strings.Join(types, ","), + } + + return r.Run() +} + // IndicesExist checks whether index (or indices) exist on the server func (c *Connection) IndicesExist(indexes ...string) (bool, error) { diff --git a/goes_test.go b/goes_test.go index 9a33f27..21a88ff 100644 --- a/goes_test.go +++ b/goes_test.go @@ -1119,3 +1119,36 @@ func (s *GoesTestSuite) TestUpdate(c *C) { c.Assert(response.Source["user"], Equals, "admin") c.Assert(response.Source["message"], Equals, "candybar") } + +func (s *GoesTestSuite) TestGetMapping(c *C) { + indexName := "testmapping" + docType := "tweet" + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + response, err := conn.GetMapping([]string{docType}, indexName) + c.Assert(err, Equals, nil) + c.Assert(len(response.Raw), Equals, 0) + + d := Document{ + Index: indexName, + Type: docType, + Fields: map[string]interface{}{ + "user": "foo", + "message": "bar", + }, + } + + response, err = conn.Index(d, url.Values{}) + c.Assert(err, IsNil) + + response, err = conn.GetMapping([]string{docType}, indexName) + c.Assert(err, Equals, nil) + c.Assert(len(response.Raw), Not(Equals), 0) +} From 1453d3c31c24925f94b4de48246b42b05ece098f Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Sat, 15 Nov 2014 23:08:40 +0100 Subject: [PATCH 11/13] Give ES some time to work in tests --- goes_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/goes_test.go b/goes_test.go index 21a88ff..a0026e3 100644 --- a/goes_test.go +++ b/goes_test.go @@ -1035,6 +1035,7 @@ func (s *GoesTestSuite) TestIndicesExist(c *C) { _, err = conn.CreateIndex(indexName, map[string]interface{}{}) c.Assert(err, IsNil) defer conn.DeleteIndex(indexName) + time.Sleep(200 * time.Millisecond) exists, err = conn.IndicesExist(indexName) c.Assert(exists, Equals, true) @@ -1070,6 +1071,7 @@ func (s *GoesTestSuite) TestUpdate(c *C) { extraArgs := make(url.Values, 1) response, err := conn.Index(d, extraArgs) c.Assert(err, IsNil) + time.Sleep(200 * time.Millisecond) expectedResponse := Response{ Index: indexName, @@ -1099,6 +1101,7 @@ func (s *GoesTestSuite) TestUpdate(c *C) { c.Skip("Scripting is disabled on server, skipping this test") return } + time.Sleep(200 * time.Millisecond) c.Assert(err, Equals, nil) @@ -1113,6 +1116,7 @@ func (s *GoesTestSuite) TestUpdate(c *C) { d.Id = docId response, err = conn.Update(d, query, extraArgs) c.Assert(err, Equals, nil) + time.Sleep(200 * time.Millisecond) response, err = conn.Get(indexName, docType, docId, url.Values{}) c.Assert(err, Equals, nil) @@ -1132,6 +1136,8 @@ func (s *GoesTestSuite) TestGetMapping(c *C) { c.Assert(err, IsNil) defer conn.DeleteIndex(indexName) + time.Sleep(300 * time.Millisecond) + response, err := conn.GetMapping([]string{docType}, indexName) c.Assert(err, Equals, nil) c.Assert(len(response.Raw), Equals, 0) @@ -1147,6 +1153,7 @@ func (s *GoesTestSuite) TestGetMapping(c *C) { response, err = conn.Index(d, url.Values{}) c.Assert(err, IsNil) + time.Sleep(200 * time.Millisecond) response, err = conn.GetMapping([]string{docType}, indexName) c.Assert(err, Equals, nil) From 4c9459a02b2004b093e52bd402769010f5082340 Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Mon, 17 Nov 2014 07:25:48 +0100 Subject: [PATCH 12/13] Use slices instead of variadic function --- goes.go | 6 +++--- goes_test.go | 21 +++++++++++---------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/goes.go b/goes.go index 9216818..5629c0c 100644 --- a/goes.go +++ b/goes.go @@ -430,7 +430,7 @@ func (b Bucket) Aggregation(name string) Aggregation { } // PutMapping registers a specific mapping for one or more types in one or more indexes -func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, indexes ...string) (Response, error) { +func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, indexes []string) (Response, error) { r := Request{ Conn: c, @@ -443,7 +443,7 @@ func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, return r.Run() } -func (c *Connection) GetMapping(types []string, indexes ...string) (Response, error) { +func (c *Connection) GetMapping(types []string, indexes []string) (Response, error) { r := Request{ Conn: c, @@ -456,7 +456,7 @@ func (c *Connection) GetMapping(types []string, indexes ...string) (Response, er } // IndicesExist checks whether index (or indices) exist on the server -func (c *Connection) IndicesExist(indexes ...string) (bool, error) { +func (c *Connection) IndicesExist(indexes []string) (bool, error) { r := Request{ Conn: c, diff --git a/goes_test.go b/goes_test.go index a0026e3..ec3f492 100644 --- a/goes_test.go +++ b/goes_test.go @@ -1015,7 +1015,7 @@ func (s *GoesTestSuite) TestPutMapping(c *C) { }, }, } - response, err = conn.PutMapping("tweet", mapping, indexName) + response, err = conn.PutMapping("tweet", mapping, []string{indexName}) c.Assert(err, IsNil) c.Assert(response.Acknowledged, Equals, true) @@ -1023,24 +1023,25 @@ func (s *GoesTestSuite) TestPutMapping(c *C) { } func (s *GoesTestSuite) TestIndicesExist(c *C) { - indexName := "testindicesexist" + indices := []string{"testindicesexist"} conn := NewConnection(ES_HOST, ES_PORT) // just in case - conn.DeleteIndex(indexName) + conn.DeleteIndex(indices[0]) - exists, err := conn.IndicesExist(indexName) + exists, err := conn.IndicesExist(indices) c.Assert(exists, Equals, false) - _, err = conn.CreateIndex(indexName, map[string]interface{}{}) + _, err = conn.CreateIndex(indices[0], map[string]interface{}{}) c.Assert(err, IsNil) - defer conn.DeleteIndex(indexName) + defer conn.DeleteIndex(indices[0]) time.Sleep(200 * time.Millisecond) - exists, err = conn.IndicesExist(indexName) + exists, err = conn.IndicesExist(indices) c.Assert(exists, Equals, true) - exists, err = conn.IndicesExist(indexName, "nonexistent") + indices = append(indices, "nonexistent") + exists, err = conn.IndicesExist(indices) c.Assert(exists, Equals, false) } @@ -1138,7 +1139,7 @@ func (s *GoesTestSuite) TestGetMapping(c *C) { time.Sleep(300 * time.Millisecond) - response, err := conn.GetMapping([]string{docType}, indexName) + response, err := conn.GetMapping([]string{docType}, []string{indexName}) c.Assert(err, Equals, nil) c.Assert(len(response.Raw), Equals, 0) @@ -1155,7 +1156,7 @@ func (s *GoesTestSuite) TestGetMapping(c *C) { c.Assert(err, IsNil) time.Sleep(200 * time.Millisecond) - response, err = conn.GetMapping([]string{docType}, indexName) + response, err = conn.GetMapping([]string{docType}, []string{indexName}) c.Assert(err, Equals, nil) c.Assert(len(response.Raw), Not(Equals), 0) } From 6c041327be7e0de883a565f5ca646e22da503a17 Mon Sep 17 00:00:00 2001 From: Marin Bek Date: Mon, 17 Nov 2014 17:04:48 +0100 Subject: [PATCH 13/13] Only set Id on document if it was given --- goes.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/goes.go b/goes.go index 5629c0c..6d737c4 100644 --- a/goes.go +++ b/goes.go @@ -478,8 +478,11 @@ func (c *Connection) Update(d Document, query map[string]interface{}, extraArgs ExtraArgs: extraArgs, method: "POST", api: "_update", - id: d.Id.(string), } + if d.Id != nil { + r.id = d.Id.(string) + } + return r.Run() }