From 5db99a78585466a9e6a3c9ad62301aa75cb4bc0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Je=CC=81ro=CC=82me=20Renard?= Date: Sat, 15 Jun 2013 08:18:48 +0200 Subject: [PATCH] Initial import --- .gitignore | 1 + LICENSE | 27 +++ Makefile | 10 + TODO | 1 + dependencies.txt | 1 + example_test.go | 143 ++++++++++++ goes.go | 288 ++++++++++++++++++++++++ goes_test.go | 553 +++++++++++++++++++++++++++++++++++++++++++++++ structs.go | 142 ++++++++++++ 9 files changed, 1166 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 TODO create mode 100644 dependencies.txt create mode 100644 example_test.go create mode 100644 goes.go create mode 100644 goes_test.go create mode 100644 structs.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9ed3b07 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.test diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..cc3fe42 --- /dev/null +++ b/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2013 Belogik. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Belogik nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bfcb413 --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ +help: + @echo "Available targets:" + @echo "- test: run tests" + @echo "- installdependencies: installs dependencies declared in dependencies.txt" + +installdependencies: + cat dependencies.txt | xargs go get + +test: installdependencies + go test -i && go test diff --git a/TODO b/TODO new file mode 100644 index 0000000..7431ef8 --- /dev/null +++ b/TODO @@ -0,0 +1 @@ +- Add Gzip support to bulk data to save bandwith diff --git a/dependencies.txt b/dependencies.txt new file mode 100644 index 0000000..5e35e09 --- /dev/null +++ b/dependencies.txt @@ -0,0 +1 @@ +launchpad.net/gocheck diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..c800734 --- /dev/null +++ b/example_test.go @@ -0,0 +1,143 @@ +// Copyright 2013 Belogik. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package goes_test + +import ( + "fmt" + "goes" + "net/url" +) + +func ExampleConnection_CreateIndex() { + conn := goes.NewConnection("localhost", "9200") + + mapping := map[string]interface{}{ + "settings": map[string]interface{}{ + "index.number_of_shards": 1, + "index.number_of_replicas": 0, + }, + "mappings": map[string]interface{}{ + "_default_": map[string]interface{}{ + "_source": map[string]interface{}{ + "enabled": true, + }, + "_all": map[string]interface{}{ + "enabled": false, + }, + }, + }, + } + + resp, err := conn.CreateIndex("test", mapping) + + if err != nil { + panic(err) + } + + fmt.Printf("%s", resp) +} + +func ExampleConnection_DeleteIndex() { + conn := goes.NewConnection("localhost", "9200") + resp, err := conn.DeleteIndex("yourinde") + + if err != nil { + panic(err) + } + + fmt.Printf("%s", resp) +} + +func ExampleConnection_RefreshIndex() { + conn := goes.NewConnection("localhost", "9200") + resp, err := conn.RefreshIndex("yourindex") + + if err != nil { + panic(err) + } + + fmt.Printf("%s", resp) +} + +func ExampleConnection_Search() { + conn := goes.NewConnection("localhost", "9200") + + var query = map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": map[string]interface{}{ + "match_all": map[string]interface{}{}, + }, + }, + }, + "from": 0, + "size": 100, + "fields": []string{"onefield"}, + "filter": map[string]interface{}{ + "range": map[string]interface{}{ + "somefield": map[string]interface{}{ + "from": "some date", + "to": "some date", + "include_lower": false, + "include_upper": false, + }, + }, + }, + } + + searchResults, err := conn.Search(query, []string{"someindex"}, []string{""}) + + if err != nil { + panic(err) + } + + fmt.Printf("%s", searchResults) +} + +func ExampleConnection_Index() { + conn := goes.NewConnection("localhost", "9200") + + d := goes.Document{ + Index: "twitter", + Type: "tweet", + Fields: map[string]interface{}{ + "user": "foo", + "message": "bar", + }, + } + + extraArgs := make(url.Values, 1) + extraArgs.Set("ttl", "86400000") + + response, err := conn.Index(d, extraArgs) + + if err != nil { + panic(err) + } + + fmt.Printf("%s", response) +} + +func ExampleConnection_Delete() { + conn := goes.NewConnection("localhost", "9200") + + //[create index, index document ...] + + d := goes.Document{ + Index: "twitter", + Type: "tweet", + Id: "1", + Fields: map[string]interface{}{ + "user": "foo", + }, + } + + response, err := conn.Delete(d, url.Values{}) + if err != nil { + panic(err) + } + + fmt.Printf("%s", response) +} diff --git a/goes.go b/goes.go new file mode 100644 index 0000000..9637596 --- /dev/null +++ b/goes.go @@ -0,0 +1,288 @@ +// Copyright 2013 Belogik. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package goes + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" +) + +const ( + BULK_COMMAND_INDEX = "index" + BULK_COMMAND_DELETE = "delete" +) + +func (err *SearchError) Error() string { + return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg) +} + +// NewConnection initiates a new Connection to an elasticsearch server +// +// This function is pretty useless for now but might be useful in a near future +// if wee need more features like connection pooling or load balancing. +func NewConnection(host string, port string) *Connection { + return &Connection{host, port} +} + +// CreateIndex creates a new index represented by a name and a mapping +func (c *Connection) CreateIndex(name string, mapping map[string]interface{}) (Response, error) { + r := Request{ + Conn: c, + Query: mapping, + IndexList: []string{name}, + method: "PUT", + } + + return r.Run() +} + +// DeleteIndex deletes an index represented by a name +func (c *Connection) DeleteIndex(name string) (Response, error) { + r := Request{ + Conn: c, + IndexList: []string{name}, + method: "DELETE", + } + + return r.Run() +} + +// RefreshIndex refreshes an index represented by a name +func (c *Connection) RefreshIndex(name string) (Response, error) { + r := Request{ + Conn: c, + IndexList: []string{name}, + method: "POST", + api: "_refresh", + } + + return r.Run() +} + +func (c *Connection) FetchStats() (Response, error) { + r := Request{ + Conn: c, + method: "GET", + api: "_stats", + } + + return r.Run() +} + +// Bulk adds multiple documents in bulk mode to the index for a given type +func (c *Connection) BulkSend(index string, documents []Document) (Response, error) { + // We do not generate a traditionnal JSON here (often a one liner) + // Elasticsearch expects one line of JSON per line (EOL = \n) + // plus an extra \n at the very end of the document + // + // More informations about the Bulk JSON format for Elasticsearch: + // + // - http://www.elasticsearch.org/guide/reference/api/bulk.html + // + // This is quite annoying for us as we can not use the simple JSON + // Marshaler available in Run(). + // + // We have to generate this special JSON by ourselves which leads to + // the code below. + // + // I know it is unreadable I must find an elegant way to fix this. + + bulkData := []byte{} + for _, doc := range documents { + header := map[string]interface{}{ + doc.BulkCommand: map[string]interface{}{ + "_index": doc.Index, + "_type": doc.Type, + "_id": doc.Id, + }, + } + + temp, err := json.Marshal(header) + if err != nil { + return Response{}, err + } + + temp = append(temp, '\n') + bulkData = append(bulkData, temp[:]...) + + if len(doc.Fields) > 0 { + fields := map[string]interface{}{} + for fieldName, fieldValue := range doc.Fields { + fields[fieldName] = fieldValue + } + + temp, err = json.Marshal(fields) + if err != nil { + return Response{}, err + } + + temp = append(temp, '\n') + bulkData = append(bulkData, temp[:]...) + } + } + + r := Request{ + Conn: c, + IndexList: []string{index}, + method: "POST", + api: "_bulk", + bulkData: bulkData, + } + + return r.Run() +} + +// Search executes a search query against an index +func (c *Connection) Search(query map[string]interface{}, indexList []string, typeList []string) (Response, error) { + r := Request{ + Conn: c, + Query: query, + IndexList: indexList, + TypeList: typeList, + method: "POST", + api: "_search", + } + + return r.Run() +} + +// Get a typed document by its id +func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) { + r := Request{ + Conn: c, + IndexList: []string{index}, + method: "GET", + api: documentType + "/" + id, + ExtraArgs: extraArgs, + } + + return r.Run() +} + +// Index indexes a Document +// The extraArgs is a list of url.Values that you can send to elasticsearch as +// URL arguments, for example, to control routing, ttl, version, op_type, etc. +func (c *Connection) Index(d Document, extraArgs url.Values) (Response, error) { + r := Request{ + Conn: c, + Query: d.Fields, + IndexList: []string{d.Index.(string)}, + TypeList: []string{d.Type}, + ExtraArgs: extraArgs, + method: "POST", + } + + if d.Id != nil { + r.method = "PUT" + r.id = d.Id.(string) + } + + return r.Run() +} + +// Delete deletes a Document d +// The extraArgs is a list of url.Values that you can send to elasticsearch as +// URL arguments, for example, to control routing. +func (c *Connection) Delete(d Document, extraArgs url.Values) (Response, error) { + r := Request{ + Conn: c, + IndexList: []string{d.Index.(string)}, + TypeList: []string{d.Type}, + ExtraArgs: extraArgs, + method: "DELETE", + id: d.Id.(string), + } + + return r.Run() +} + +// Run executes an elasticsearch Request. It converts data to Json, sends the +// request and return the Response obtained +func (req *Request) Run() (Response, error) { + postData := []byte{} + + // XXX : refactor this + if req.api == "_bulk" { + postData = req.bulkData + } else { + b, err := json.Marshal(req.Query) + if err != nil { + return Response{}, err + } + postData = b + } + + reader := bytes.NewReader(postData) + + client := http.DefaultClient + + newReq, err := http.NewRequest(req.method, req.Url(), reader) + if err != nil { + return Response{}, err + } + + if req.method == "POST" || req.method == "PUT" { + newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded") + } + + resp, err := client.Do(newReq) + if err != nil { + return Response{}, err + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return Response{}, err + } + + if resp.StatusCode > 201 && resp.StatusCode < 400 { + return Response{}, errors.New(string(body)) + } + + esResp := new(Response) + err = json.Unmarshal(body, &esResp) + if err != nil { + return Response{}, err + } + + if esResp.Error != "" { + return Response{}, &SearchError{esResp.Error, esResp.Status} + } + + return *esResp, nil +} + +// Url builds a Request for a URL +func (r *Request) Url() string { + path := "/" + strings.Join(r.IndexList, ",") + + if len(r.TypeList) > 0 { + path += "/" + strings.Join(r.TypeList, ",") + } + + // XXX : for indexing documents using the normal (non bulk) API + if len(r.api) == 0 && len(r.id) > 0 { + path += "/" + r.id + } + + path += "/" + r.api + + u := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port), + Path: path, + RawQuery: r.ExtraArgs.Encode(), + } + + return u.String() +} diff --git a/goes_test.go b/goes_test.go new file mode 100644 index 0000000..ee80df6 --- /dev/null +++ b/goes_test.go @@ -0,0 +1,553 @@ +// Copyright 2013 Belogik. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package goes + +import ( + . "launchpad.net/gocheck" + "net/url" + "os" + "testing" + "time" +) + +var ( + ES_HOST = "localhost" + ES_PORT = "9200" +) + +// Hook up gocheck into the gotest runner. +func Test(t *testing.T) { TestingT(t) } + +type GoesTestSuite struct{} + +var _ = Suite(&GoesTestSuite{}) + +func (s *GoesTestSuite) SetUpTest(c *C) { + h := os.Getenv("TEST_ELASTICSEARCH_HOST") + if h != "" { + ES_HOST = h + } + + p := os.Getenv("TEST_ELASTICSEARCH_PORT") + if p != "" { + ES_PORT = p + } +} + +func (s *GoesTestSuite) TestNewConnection(c *C) { + conn := NewConnection(ES_HOST, ES_PORT) + c.Assert(conn, DeepEquals, &Connection{ES_HOST, ES_PORT}) +} + +func (s *GoesTestSuite) TestUrl(c *C) { + conn := NewConnection(ES_HOST, ES_PORT) + + r := Request{ + Conn: conn, + Query: "q", + IndexList: []string{"i"}, + TypeList: []string{}, + method: "GET", + api: "_search", + } + + c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/i/_search") + + r.IndexList = []string{"a", "b"} + c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/_search") + + r.TypeList = []string{"c", "d"} + c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/_search") + + r.ExtraArgs = make(url.Values, 1) + r.ExtraArgs.Set("version", "1") + c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/_search?version=1") + + r.id = "1234" + r.api = "" + c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/1234/?version=1") +} + +func (s *GoesTestSuite) TestEsDown(c *C) { + conn := NewConnection("a.b.c.d", "1234") + + var query = map[string]interface{}{"query": "foo"} + + r := Request{ + Conn: conn, + Query: query, + IndexList: []string{"i"}, + method: "GET", + api: "_search", + } + _, err := r.Run() + + c.Assert(err.Error(), Equals, "Get http://a.b.c.d:1234/i/_search: lookup a.b.c.d: no such host") +} + +func (s *GoesTestSuite) TestRunMissingIndex(c *C) { + conn := NewConnection(ES_HOST, ES_PORT) + + var query = map[string]interface{}{"query": "foo"} + + r := Request{ + Conn: conn, + Query: query, + IndexList: []string{"i"}, + method: "GET", + api: "_search", + } + _, err := r.Run() + + c.Assert(err.Error(), Equals, "[404] IndexMissingException[[i] missing]") +} + +func (s *GoesTestSuite) TestCreateIndex(c *C) { + indexName := "testcreateindexgoes" + + conn := NewConnection(ES_HOST, ES_PORT) + defer conn.DeleteIndex(indexName) + + mapping := map[string]interface{}{ + "settings": map[string]interface{}{ + "index.number_of_shards": 1, + "index.number_of_replicas": 0, + }, + "mappings": map[string]interface{}{ + "_default_": map[string]interface{}{ + "_source": map[string]interface{}{ + "enabled": false, + }, + "_all": map[string]interface{}{ + "enabled": false, + }, + }, + }, + } + + resp, err := conn.CreateIndex(indexName, mapping) + + c.Assert(err, IsNil) + c.Assert(resp.Ok, Equals, true) + c.Assert(resp.Acknowledged, Equals, true) +} + +func (s *GoesTestSuite) TestDeleteIndexInexistantIndex(c *C) { + conn := NewConnection(ES_HOST, ES_PORT) + resp, err := conn.DeleteIndex("foobar") + + c.Assert(err.Error(), Equals, "[404] IndexMissingException[[foobar] missing]") + c.Assert(resp, DeepEquals, Response{}) +} + +func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) { + conn := NewConnection(ES_HOST, ES_PORT) + + indexName := "testdeleteindexexistingindex" + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + + c.Assert(err, IsNil) + + resp, err := conn.DeleteIndex(indexName) + c.Assert(err, IsNil) + + expectedResponse := Response{} + expectedResponse.Ok = true + expectedResponse.Acknowledged = true + c.Assert(resp, DeepEquals, expectedResponse) +} + +func (s *GoesTestSuite) TestRefreshIndex(c *C) { + conn := NewConnection(ES_HOST, ES_PORT) + indexName := "testrefreshindex" + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + + resp, err := conn.RefreshIndex(indexName) + c.Assert(err, IsNil) + c.Assert(resp.Ok, Equals, true) + + _, err = conn.DeleteIndex(indexName) + c.Assert(err, IsNil) +} + +func (s *GoesTestSuite) TestBulkSend(c *C) { + indexName := "testbulkadd" + docType := "tweet" + + tweets := []Document{ + Document{ + Id: "123", + Index: nil, + Type: docType, + BulkCommand: BULK_COMMAND_INDEX, + Fields: map[string]interface{}{ + "user": "foo", + "message": "some foo message", + }, + }, + + Document{ + Id: nil, + Index: indexName, + Type: docType, + BulkCommand: BULK_COMMAND_INDEX, + Fields: map[string]interface{}{ + "user": "bar", + "message": "some bar message", + }, + }, + } + + conn := NewConnection(ES_HOST, ES_PORT) + + _, err := conn.CreateIndex(indexName, nil) + c.Assert(err, IsNil) + + response, err := conn.BulkSend(indexName, tweets) + i := Item{ + Ok: true, + Id: "123", + Type: docType, + Version: 1, + Index: indexName, + } + c.Assert(response.Items[0][BULK_COMMAND_INDEX], Equals, i) + c.Assert(err, IsNil) + + _, err = conn.RefreshIndex(indexName) + c.Assert(err, IsNil) + + var query = map[string]interface{}{ + "query": map[string]interface{}{ + "match_all": map[string]interface{}{}, + }, + } + + searchResults, err := conn.Search(query, []string{indexName}, []string{}) + c.Assert(err, IsNil) + + var expectedTotal uint64 = 2 + c.Assert(searchResults.Hits.Total, Equals, expectedTotal) + + extraDocId := "" + checked := 0 + for _, hit := range searchResults.Hits.Hits { + if hit.Source["user"] == "foo" { + c.Assert(hit.Id, Equals, "123") + checked++ + } + + if hit.Source["user"] == "bar" { + c.Assert(len(hit.Id) > 0, Equals, true) + extraDocId = hit.Id + checked++ + } + } + c.Assert(checked, Equals, 2) + + docToDelete := []Document{ + Document{ + Id: "123", + Index: indexName, + Type: docType, + BulkCommand: BULK_COMMAND_DELETE, + }, + Document{ + Id: extraDocId, + Index: indexName, + Type: docType, + BulkCommand: BULK_COMMAND_DELETE, + }, + } + + response, err = conn.BulkSend(indexName, docToDelete) + i = Item{ + Ok: true, + Id: "123", + Type: docType, + Version: 2, + Index: indexName, + } + c.Assert(response.Items[0][BULK_COMMAND_DELETE], Equals, i) + + c.Assert(err, IsNil) + + _, err = conn.RefreshIndex(indexName) + c.Assert(err, IsNil) + + searchResults, err = conn.Search(query, []string{indexName}, []string{}) + c.Assert(err, IsNil) + + expectedTotal = 0 + c.Assert(searchResults.Hits.Total, Equals, expectedTotal) + + _, err = conn.DeleteIndex(indexName) + c.Assert(err, IsNil) +} + +func (s *GoesTestSuite) TestFetchStats(c *C) { + conn := NewConnection(ES_HOST, ES_PORT) + indexName := "testfetchstats" + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + + // we must wait for a bit otherwise ES crashes + time.Sleep(1 * time.Second) + + response, err := conn.FetchStats() + c.Assert(err, IsNil) + + c.Assert(response.All.Indices[indexName].Primaries["docs"].Count, Equals, 0) + + _, err = conn.DeleteIndex(indexName) + c.Assert(err, IsNil) +} + +func (s *GoesTestSuite) TestIndexIdDefined(c *C) { + indexName := "testindexiddefined" + docType := "tweet" + docId := "1234" + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + d := Document{ + Index: indexName, + Type: docType, + Id: docId, + Fields: map[string]interface{}{ + "user": "foo", + "message": "bar", + }, + } + + extraArgs := make(url.Values, 1) + extraArgs.Set("ttl", "86400000") + response, err := conn.Index(d, extraArgs) + c.Assert(err, IsNil) + + expectedResponse := Response{ + Ok: true, + Index: indexName, + Id: docId, + Type: docType, + Version: 1, + } + + c.Assert(response, DeepEquals, expectedResponse) +} + +func (s *GoesTestSuite) TestIndexIdNotDefined(c *C) { + indexName := "testindexidnotdefined" + docType := "tweet" + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + d := Document{ + Index: indexName, + Type: docType, + Fields: map[string]interface{}{ + "user": "foo", + "message": "bar", + }, + } + + response, err := conn.Index(d, url.Values{}) + c.Assert(err, IsNil) + + c.Assert(response.Ok, Equals, true) + c.Assert(response.Index, Equals, indexName) + c.Assert(response.Type, Equals, docType) + c.Assert(response.Version, Equals, 1) + c.Assert(response.Id != "", Equals, true) +} + +func (s *GoesTestSuite) TestDelete(c *C) { + indexName := "testdelete" + docType := "tweet" + docId := "1234" + + conn := NewConnection(ES_HOST, ES_PORT) + // just in case + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + d := Document{ + Index: indexName, + Type: docType, + Id: docId, + Fields: map[string]interface{}{ + "user": "foo", + }, + } + + _, err = conn.Index(d, url.Values{}) + c.Assert(err, IsNil) + + response, err := conn.Delete(d, url.Values{}) + c.Assert(err, IsNil) + + expectedResponse := Response{ + Ok: true, + Found: true, + Index: indexName, + Type: docType, + Id: docId, + // XXX : even after a DELETE the version number seems to be incremented + Version: 2, + } + c.Assert(response, DeepEquals, expectedResponse) + + response, err = conn.Delete(d, url.Values{}) + c.Assert(err, IsNil) + + expectedResponse = Response{ + Ok: true, + Found: false, + Index: indexName, + Type: docType, + Id: docId, + // XXX : even after a DELETE the version number seems to be incremented + Version: 3, + } + c.Assert(response, DeepEquals, expectedResponse) +} + +func (s *GoesTestSuite) TestGet(c *C) { + indexName := "testget" + docType := "tweet" + docId := "111" + source := map[string]interface{}{ + "f1": "foo", + "f2": "foo", + } + + conn := NewConnection(ES_HOST, ES_PORT) + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + defer conn.DeleteIndex(indexName) + + d := Document{ + Index: indexName, + Type: docType, + Id: docId, + Fields: source, + } + + _, err = conn.Index(d, url.Values{}) + c.Assert(err, IsNil) + + response, err := conn.Get(indexName, docType, docId, url.Values{}) + c.Assert(err, IsNil) + + expectedResponse := Response{ + Index: indexName, + Type: docType, + Id: docId, + Version: 1, + Exists: true, + Source: source, + } + + c.Assert(response, DeepEquals, expectedResponse) + + fields := make(url.Values, 1) + fields.Set("fields", "f1") + response, err = conn.Get(indexName, docType, docId, fields) + c.Assert(err, IsNil) + + expectedResponse = Response{ + Index: indexName, + Type: docType, + Id: docId, + Version: 1, + Exists: true, + Fields: map[string]interface{}{ + "f1": "foo", + }, + } + + c.Assert(response, DeepEquals, expectedResponse) +} + +func (s *GoesTestSuite) TestSearch(c *C) { + indexName := "testsearch" + docType := "tweet" + docId := "1234" + source := map[string]interface{}{ + "user": "foo", + "message": "bar", + } + + conn := NewConnection(ES_HOST, ES_PORT) + conn.DeleteIndex(indexName) + + _, err := conn.CreateIndex(indexName, map[string]interface{}{}) + c.Assert(err, IsNil) + //defer conn.DeleteIndex(indexName) + + d := Document{ + Index: indexName, + Type: docType, + Id: docId, + Fields: source, + } + + _, err = conn.Index(d, url.Values{}) + c.Assert(err, IsNil) + + _, err = conn.RefreshIndex(indexName) + c.Assert(err, IsNil) + + // I can feel my eyes bleeding + query := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + map[string]interface{}{ + "match_all": map[string]interface{}{}, + }, + }, + }, + }, + } + response, err := conn.Search(query, []string{indexName}, []string{docType}) + + expectedHits := Hits{ + Total: 1, + MaxScore: 1.0, + Hits: []Hit{ + Hit{ + Index: indexName, + Type: docType, + Id: docId, + Score: 1.0, + Source: source, + }, + }, + } + + c.Assert(response.Hits, DeepEquals, expectedHits) +} diff --git a/structs.go b/structs.go new file mode 100644 index 0000000..f5e8b38 --- /dev/null +++ b/structs.go @@ -0,0 +1,142 @@ +// Copyright 2013 Belogik. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package goes + +import ( + "net/url" +) + +// Represents a Connection object to elasticsearch +type Connection struct { + // The host to connect to + Host string + + // The port to use + Port string +} + +// Represents a Request to elasticsearch +type Request struct { + // Which connection will be used + Conn *Connection + + // A search query + Query interface{} + + // Which index to search into + IndexList []string + + // Which type to search into + TypeList []string + + // HTTP Method to user (GET, POST ...) + method string + + // Which api keyword (_search, _bulk, etc) to use + api string + + // Bulk data + bulkData []byte + + // A list of extra URL arguments + ExtraArgs url.Values + + // Used for the id field when indexing a document + id string +} + +// Represents a search Response from elasticsearch +type Response struct { + Ok bool + Acknowledged bool + Error string + Status uint64 + Took uint64 + TimedOut bool `json:"timed_out"` + Shards Shard `json:"_shards"` + Hits Hits + Index string `json:"_index"` + Id string `json:"_id"` + Type string `json:"_type"` + Version int `json:"_version"` + Found bool + + // Used by the _stats API + All All `json:"_all"` + + // Used by the _bulk API + Items []map[string]Item `json:"items,omitempty"` + + // Used by the GET API + Exists bool + Source map[string]interface{} `json:"_source"` + Fields map[string]interface{} `json:"fields"` +} + +// Represents a document to send to elasticsearch +type Document struct { + // XXX : interface as we can support nil values + Index interface{} + Type string + Id interface{} + BulkCommand string + Fields map[string]interface{} +} + +// Represents the "items" field in a _bulk response +type Item struct { + Ok bool `json:"ok"` + Type string `json:"_type"` + Id string `json:"_id"` + Index string `json:"_index"` + Version int `json:"_version"` +} + +// Represents the "_all" field when calling the _stats API +// This is minimal but this is what I only need +type All struct { + Indices map[string]StatIndex `json:"indices"` + Primaries map[string]StatPrimary `json:"primaries"` +} + +type StatIndex struct { + Primaries map[string]StatPrimary `json:"primaries"` +} + +type StatPrimary struct { + // primary/docs: + Count int + Deleted int +} + +// Represents the "shard" struct as returned by elasticsearch +type Shard struct { + Total uint64 + Successful uint64 + Failed uint64 +} + +// Represent a hit returned by a search +type Hit struct { + Index string `json:"_index"` + Type string `json:"_type"` + Id string `json:"_id"` + Score float64 `json:"_score"` + Source map[string]interface{} `json:"_source"` + Fields map[string]interface{} `json:"fields"` +} + +// Represent the hits structure as returned by elasticsearch +type Hits struct { + Total uint64 + // max_score may contain the "null" value + MaxScore interface{} `json:"max_score"` + Hits []Hit +} + +type SearchError struct { + Msg string + StatusCode uint64 +}