diff --git a/.gitignore b/.gitignore index 9ed3b07..6788dd3 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.test +*.swp diff --git a/goes.go b/goes.go index 4439b1a..6d737c4 100644 --- a/goes.go +++ b/goes.go @@ -345,9 +345,14 @@ func (req *Request) Run() (Response, error) { } esResp := new(Response) - err = json.Unmarshal(body, &esResp) - if err != nil { - return Response{}, err + if req.method == "HEAD" { + esResp.Status = uint64(resp.StatusCode) + } else { + err = json.Unmarshal(body, &esResp) + if err != nil { + return Response{}, err + } + json.Unmarshal(body, &esResp.Raw) } if req.api == "_bulk" && esResp.Errors { @@ -377,7 +382,7 @@ func (r *Request) Url() string { } // XXX : for indexing documents using the normal (non bulk) API - if len(r.api) == 0 && len(r.id) > 0 { + if len(r.id) > 0 { path += "/" + r.id } @@ -423,3 +428,61 @@ func (b Bucket) Aggregation(name string) Aggregation { return Aggregation{} } } + +// PutMapping registers a specific mapping for one or more types in one or more indexes +func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, indexes []string) (Response, error) { + + r := Request{ + Conn: c, + Query: mapping, + IndexList: indexes, + method: "PUT", + api: "_mappings/" + typeName, + } + + return r.Run() +} + +func (c *Connection) GetMapping(types []string, indexes []string) (Response, error) { + + r := Request{ + Conn: c, + IndexList: indexes, + method: "GET", + api: "_mapping/" + strings.Join(types, ","), + } + + return r.Run() +} + +// IndicesExist checks whether index (or indices) exist on the server +func (c *Connection) IndicesExist(indexes []string) (bool, error) { + + r := Request{ + Conn: c, + IndexList: indexes, + method: "HEAD", + } + + resp, err := r.Run() + + return resp.Status == 200, err +} + +func (c *Connection) Update(d Document, query map[string]interface{}, extraArgs url.Values) (Response, error) { + r := Request{ + Conn: c, + Query: query, + IndexList: []string{d.Index.(string)}, + TypeList: []string{d.Type}, + ExtraArgs: extraArgs, + method: "POST", + api: "_update", + } + + if d.Id != nil { + r.id = d.Id.(string) + } + + return r.Run() +} diff --git a/goes_test.go b/goes_test.go index 49914e8..ec3f492 100644 --- a/goes_test.go +++ b/goes_test.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "os" + "strings" "testing" "time" ) @@ -171,6 +172,7 @@ func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) { expectedResponse := Response{} expectedResponse.Acknowledged = true + resp.Raw = nil c.Assert(resp, DeepEquals, expectedResponse) } @@ -361,6 +363,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) { Version: 1, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) } @@ -425,6 +428,7 @@ func (s *GoesTestSuite) TestIndexIdDefined(c *C) { Version: 1, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) } @@ -494,6 +498,7 @@ func (s *GoesTestSuite) TestDelete(c *C) { // XXX : even after a DELETE the version number seems to be incremented Version: 2, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) response, err = conn.Delete(d, url.Values{}) @@ -507,6 +512,7 @@ func (s *GoesTestSuite) TestDelete(c *C) { // XXX : even after a DELETE the version number seems to be incremented Version: 3, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) } @@ -566,6 +572,7 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { Id: "", Version: 0, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) //should be 0 docs after delete by query @@ -612,6 +619,7 @@ func (s *GoesTestSuite) TestGet(c *C) { Source: source, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) fields := make(url.Values, 1) @@ -630,6 +638,7 @@ func (s *GoesTestSuite) TestGet(c *C) { }, } + response.Raw = nil c.Assert(response, DeepEquals, expectedResponse) } @@ -970,3 +979,184 @@ func (s *GoesTestSuite) TestAggregations(c *C) { c.Assert(age["count"], Equals, 2.0) c.Assert(age["sum"], Equals, 25.0+30.0) } + +func (s *GoesTestSuite) TestPutMapping(c *C) { + indexName := "testputmapping" + docType := "tweet" + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + d := Document{ + Index: indexName, + Type: docType, + Fields: map[string]interface{}{ + "user": "foo", + "message": "bar", + }, + } + + response, err := conn.Index(d, url.Values{}) + c.Assert(err, IsNil) + + mapping := map[string]interface{}{ + "tweet": map[string]interface{}{ + "properties": map[string]interface{}{ + "count": map[string]interface{}{ + "type": "integer", + "index": "not_analyzed", + "store": true, + }, + }, + }, + } + response, err = conn.PutMapping("tweet", mapping, []string{indexName}) + c.Assert(err, IsNil) + + c.Assert(response.Acknowledged, Equals, true) + c.Assert(response.TimedOut, Equals, false) +} + +func (s *GoesTestSuite) TestIndicesExist(c *C) { + indices := []string{"testindicesexist"} + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indices[0]) + + exists, err := conn.IndicesExist(indices) + c.Assert(exists, Equals, false) + + _, err = conn.CreateIndex(indices[0], map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indices[0]) + time.Sleep(200 * time.Millisecond) + + exists, err = conn.IndicesExist(indices) + c.Assert(exists, Equals, true) + + indices = append(indices, "nonexistent") + exists, err = conn.IndicesExist(indices) + c.Assert(exists, Equals, false) +} + +func (s *GoesTestSuite) TestUpdate(c *C) { + indexName := "testupdate" + docType := "tweet" + docId := "1234" + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + d := Document{ + Index: indexName, + Type: docType, + Id: docId, + Fields: map[string]interface{}{ + "user": "foo", + "message": "bar", + "counter": 1, + }, + } + + extraArgs := make(url.Values, 1) + response, err := conn.Index(d, extraArgs) + c.Assert(err, IsNil) + time.Sleep(200 * time.Millisecond) + + expectedResponse := Response{ + Index: indexName, + Id: docId, + Type: docType, + Version: 1, + } + + response.Raw = nil + c.Assert(response, DeepEquals, expectedResponse) + + // Now that we have an ordinary document indexed, try updating it + query := map[string]interface{}{ + "script": "ctx._source.counter += count", + "params": map[string]interface{}{ + "count": 5, + }, + "upsert": map[string]interface{}{ + "message": "candybar", + "user": "admin", + "counter": 1, + }, + } + + response, err = conn.Update(d, query, extraArgs) + if err != nil && strings.Contains(err.(*SearchError).Msg, "dynamic scripting disabled") { + c.Skip("Scripting is disabled on server, skipping this test") + return + } + time.Sleep(200 * time.Millisecond) + + c.Assert(err, Equals, nil) + + response, err = conn.Get(indexName, docType, docId, url.Values{}) + c.Assert(err, Equals, nil) + c.Assert(response.Source["counter"], Equals, float64(6)) + c.Assert(response.Source["user"], Equals, "foo") + c.Assert(response.Source["message"], Equals, "bar") + + // Test another document, non-existent + docId = "555" + d.Id = docId + response, err = conn.Update(d, query, extraArgs) + c.Assert(err, Equals, nil) + time.Sleep(200 * time.Millisecond) + + response, err = conn.Get(indexName, docType, docId, url.Values{}) + c.Assert(err, Equals, nil) + c.Assert(response.Source["user"], Equals, "admin") + c.Assert(response.Source["message"], Equals, "candybar") +} + +func (s *GoesTestSuite) TestGetMapping(c *C) { + indexName := "testmapping" + docType := "tweet" + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + time.Sleep(300 * time.Millisecond) + + response, err := conn.GetMapping([]string{docType}, []string{indexName}) + c.Assert(err, Equals, nil) + c.Assert(len(response.Raw), Equals, 0) + + d := Document{ + Index: indexName, + Type: docType, + Fields: map[string]interface{}{ + "user": "foo", + "message": "bar", + }, + } + + response, err = conn.Index(d, url.Values{}) + c.Assert(err, IsNil) + time.Sleep(200 * time.Millisecond) + + response, err = conn.GetMapping([]string{docType}, []string{indexName}) + c.Assert(err, Equals, nil) + c.Assert(len(response.Raw), Not(Equals), 0) +} diff --git a/structs.go b/structs.go index cfd043b..e397a9e 100644 --- a/structs.go +++ b/structs.go @@ -88,6 +88,8 @@ type Response struct { ScrollId string `json:"_scroll_id"` Aggregations map[string]Aggregation `json:"aggregations,omitempty"` + + Raw map[string]interface{} } // Represents an aggregation from response