@@ -0,0 +1 @@ | |||||
*.test |
@@ -0,0 +1,27 @@ | |||||
Copyright (c) 2013 Belogik. All rights reserved. | |||||
Redistribution and use in source and binary forms, with or without | |||||
modification, are permitted provided that the following conditions are | |||||
met: | |||||
* Redistributions of source code must retain the above copyright | |||||
notice, this list of conditions and the following disclaimer. | |||||
* Redistributions in binary form must reproduce the above | |||||
copyright notice, this list of conditions and the following disclaimer | |||||
in the documentation and/or other materials provided with the | |||||
distribution. | |||||
* Neither the name of Belogik nor the names of its | |||||
contributors may be used to endorse or promote products derived from | |||||
this software without specific prior written permission. | |||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | |||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | |||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | |||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | |||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | |||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | |||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | |||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | |||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | |||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
@@ -0,0 +1,10 @@ | |||||
help: | |||||
@echo "Available targets:" | |||||
@echo "- test: run tests" | |||||
@echo "- installdependencies: installs dependencies declared in dependencies.txt" | |||||
installdependencies: | |||||
cat dependencies.txt | xargs go get | |||||
test: installdependencies | |||||
go test -i && go test |
@@ -0,0 +1 @@ | |||||
- Add Gzip support to bulk data to save bandwith |
@@ -0,0 +1 @@ | |||||
launchpad.net/gocheck |
@@ -0,0 +1,143 @@ | |||||
// Copyright 2013 Belogik. All rights reserved. | |||||
// Use of this source code is governed by a BSD-style | |||||
// license that can be found in the LICENSE file. | |||||
package goes_test | |||||
import ( | |||||
"fmt" | |||||
"goes" | |||||
"net/url" | |||||
) | |||||
func ExampleConnection_CreateIndex() { | |||||
conn := goes.NewConnection("localhost", "9200") | |||||
mapping := map[string]interface{}{ | |||||
"settings": map[string]interface{}{ | |||||
"index.number_of_shards": 1, | |||||
"index.number_of_replicas": 0, | |||||
}, | |||||
"mappings": map[string]interface{}{ | |||||
"_default_": map[string]interface{}{ | |||||
"_source": map[string]interface{}{ | |||||
"enabled": true, | |||||
}, | |||||
"_all": map[string]interface{}{ | |||||
"enabled": false, | |||||
}, | |||||
}, | |||||
}, | |||||
} | |||||
resp, err := conn.CreateIndex("test", mapping) | |||||
if err != nil { | |||||
panic(err) | |||||
} | |||||
fmt.Printf("%s", resp) | |||||
} | |||||
func ExampleConnection_DeleteIndex() { | |||||
conn := goes.NewConnection("localhost", "9200") | |||||
resp, err := conn.DeleteIndex("yourinde") | |||||
if err != nil { | |||||
panic(err) | |||||
} | |||||
fmt.Printf("%s", resp) | |||||
} | |||||
func ExampleConnection_RefreshIndex() { | |||||
conn := goes.NewConnection("localhost", "9200") | |||||
resp, err := conn.RefreshIndex("yourindex") | |||||
if err != nil { | |||||
panic(err) | |||||
} | |||||
fmt.Printf("%s", resp) | |||||
} | |||||
func ExampleConnection_Search() { | |||||
conn := goes.NewConnection("localhost", "9200") | |||||
var query = map[string]interface{}{ | |||||
"query": map[string]interface{}{ | |||||
"bool": map[string]interface{}{ | |||||
"must": map[string]interface{}{ | |||||
"match_all": map[string]interface{}{}, | |||||
}, | |||||
}, | |||||
}, | |||||
"from": 0, | |||||
"size": 100, | |||||
"fields": []string{"onefield"}, | |||||
"filter": map[string]interface{}{ | |||||
"range": map[string]interface{}{ | |||||
"somefield": map[string]interface{}{ | |||||
"from": "some date", | |||||
"to": "some date", | |||||
"include_lower": false, | |||||
"include_upper": false, | |||||
}, | |||||
}, | |||||
}, | |||||
} | |||||
searchResults, err := conn.Search(query, []string{"someindex"}, []string{""}) | |||||
if err != nil { | |||||
panic(err) | |||||
} | |||||
fmt.Printf("%s", searchResults) | |||||
} | |||||
func ExampleConnection_Index() { | |||||
conn := goes.NewConnection("localhost", "9200") | |||||
d := goes.Document{ | |||||
Index: "twitter", | |||||
Type: "tweet", | |||||
Fields: map[string]interface{}{ | |||||
"user": "foo", | |||||
"message": "bar", | |||||
}, | |||||
} | |||||
extraArgs := make(url.Values, 1) | |||||
extraArgs.Set("ttl", "86400000") | |||||
response, err := conn.Index(d, extraArgs) | |||||
if err != nil { | |||||
panic(err) | |||||
} | |||||
fmt.Printf("%s", response) | |||||
} | |||||
func ExampleConnection_Delete() { | |||||
conn := goes.NewConnection("localhost", "9200") | |||||
//[create index, index document ...] | |||||
d := goes.Document{ | |||||
Index: "twitter", | |||||
Type: "tweet", | |||||
Id: "1", | |||||
Fields: map[string]interface{}{ | |||||
"user": "foo", | |||||
}, | |||||
} | |||||
response, err := conn.Delete(d, url.Values{}) | |||||
if err != nil { | |||||
panic(err) | |||||
} | |||||
fmt.Printf("%s", response) | |||||
} |
@@ -0,0 +1,288 @@ | |||||
// Copyright 2013 Belogik. All rights reserved. | |||||
// Use of this source code is governed by a BSD-style | |||||
// license that can be found in the LICENSE file. | |||||
package goes | |||||
import ( | |||||
"bytes" | |||||
"encoding/json" | |||||
"errors" | |||||
"fmt" | |||||
"io/ioutil" | |||||
"net/http" | |||||
"net/url" | |||||
"strings" | |||||
) | |||||
const ( | |||||
BULK_COMMAND_INDEX = "index" | |||||
BULK_COMMAND_DELETE = "delete" | |||||
) | |||||
func (err *SearchError) Error() string { | |||||
return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg) | |||||
} | |||||
// NewConnection initiates a new Connection to an elasticsearch server | |||||
// | |||||
// This function is pretty useless for now but might be useful in a near future | |||||
// if wee need more features like connection pooling or load balancing. | |||||
func NewConnection(host string, port string) *Connection { | |||||
return &Connection{host, port} | |||||
} | |||||
// CreateIndex creates a new index represented by a name and a mapping | |||||
func (c *Connection) CreateIndex(name string, mapping map[string]interface{}) (Response, error) { | |||||
r := Request{ | |||||
Conn: c, | |||||
Query: mapping, | |||||
IndexList: []string{name}, | |||||
method: "PUT", | |||||
} | |||||
return r.Run() | |||||
} | |||||
// DeleteIndex deletes an index represented by a name | |||||
func (c *Connection) DeleteIndex(name string) (Response, error) { | |||||
r := Request{ | |||||
Conn: c, | |||||
IndexList: []string{name}, | |||||
method: "DELETE", | |||||
} | |||||
return r.Run() | |||||
} | |||||
// RefreshIndex refreshes an index represented by a name | |||||
func (c *Connection) RefreshIndex(name string) (Response, error) { | |||||
r := Request{ | |||||
Conn: c, | |||||
IndexList: []string{name}, | |||||
method: "POST", | |||||
api: "_refresh", | |||||
} | |||||
return r.Run() | |||||
} | |||||
func (c *Connection) FetchStats() (Response, error) { | |||||
r := Request{ | |||||
Conn: c, | |||||
method: "GET", | |||||
api: "_stats", | |||||
} | |||||
return r.Run() | |||||
} | |||||
// Bulk adds multiple documents in bulk mode to the index for a given type | |||||
func (c *Connection) BulkSend(index string, documents []Document) (Response, error) { | |||||
// We do not generate a traditionnal JSON here (often a one liner) | |||||
// Elasticsearch expects one line of JSON per line (EOL = \n) | |||||
// plus an extra \n at the very end of the document | |||||
// | |||||
// More informations about the Bulk JSON format for Elasticsearch: | |||||
// | |||||
// - http://www.elasticsearch.org/guide/reference/api/bulk.html | |||||
// | |||||
// This is quite annoying for us as we can not use the simple JSON | |||||
// Marshaler available in Run(). | |||||
// | |||||
// We have to generate this special JSON by ourselves which leads to | |||||
// the code below. | |||||
// | |||||
// I know it is unreadable I must find an elegant way to fix this. | |||||
bulkData := []byte{} | |||||
for _, doc := range documents { | |||||
header := map[string]interface{}{ | |||||
doc.BulkCommand: map[string]interface{}{ | |||||
"_index": doc.Index, | |||||
"_type": doc.Type, | |||||
"_id": doc.Id, | |||||
}, | |||||
} | |||||
temp, err := json.Marshal(header) | |||||
if err != nil { | |||||
return Response{}, err | |||||
} | |||||
temp = append(temp, '\n') | |||||
bulkData = append(bulkData, temp[:]...) | |||||
if len(doc.Fields) > 0 { | |||||
fields := map[string]interface{}{} | |||||
for fieldName, fieldValue := range doc.Fields { | |||||
fields[fieldName] = fieldValue | |||||
} | |||||
temp, err = json.Marshal(fields) | |||||
if err != nil { | |||||
return Response{}, err | |||||
} | |||||
temp = append(temp, '\n') | |||||
bulkData = append(bulkData, temp[:]...) | |||||
} | |||||
} | |||||
r := Request{ | |||||
Conn: c, | |||||
IndexList: []string{index}, | |||||
method: "POST", | |||||
api: "_bulk", | |||||
bulkData: bulkData, | |||||
} | |||||
return r.Run() | |||||
} | |||||
// Search executes a search query against an index | |||||
func (c *Connection) Search(query map[string]interface{}, indexList []string, typeList []string) (Response, error) { | |||||
r := Request{ | |||||
Conn: c, | |||||
Query: query, | |||||
IndexList: indexList, | |||||
TypeList: typeList, | |||||
method: "POST", | |||||
api: "_search", | |||||
} | |||||
return r.Run() | |||||
} | |||||
// Get a typed document by its id | |||||
func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) { | |||||
r := Request{ | |||||
Conn: c, | |||||
IndexList: []string{index}, | |||||
method: "GET", | |||||
api: documentType + "/" + id, | |||||
ExtraArgs: extraArgs, | |||||
} | |||||
return r.Run() | |||||
} | |||||
// Index indexes a Document | |||||
// The extraArgs is a list of url.Values that you can send to elasticsearch as | |||||
// URL arguments, for example, to control routing, ttl, version, op_type, etc. | |||||
func (c *Connection) Index(d Document, extraArgs url.Values) (Response, error) { | |||||
r := Request{ | |||||
Conn: c, | |||||
Query: d.Fields, | |||||
IndexList: []string{d.Index.(string)}, | |||||
TypeList: []string{d.Type}, | |||||
ExtraArgs: extraArgs, | |||||
method: "POST", | |||||
} | |||||
if d.Id != nil { | |||||
r.method = "PUT" | |||||
r.id = d.Id.(string) | |||||
} | |||||
return r.Run() | |||||
} | |||||
// Delete deletes a Document d | |||||
// The extraArgs is a list of url.Values that you can send to elasticsearch as | |||||
// URL arguments, for example, to control routing. | |||||
func (c *Connection) Delete(d Document, extraArgs url.Values) (Response, error) { | |||||
r := Request{ | |||||
Conn: c, | |||||
IndexList: []string{d.Index.(string)}, | |||||
TypeList: []string{d.Type}, | |||||
ExtraArgs: extraArgs, | |||||
method: "DELETE", | |||||
id: d.Id.(string), | |||||
} | |||||
return r.Run() | |||||
} | |||||
// Run executes an elasticsearch Request. It converts data to Json, sends the | |||||
// request and return the Response obtained | |||||
func (req *Request) Run() (Response, error) { | |||||
postData := []byte{} | |||||
// XXX : refactor this | |||||
if req.api == "_bulk" { | |||||
postData = req.bulkData | |||||
} else { | |||||
b, err := json.Marshal(req.Query) | |||||
if err != nil { | |||||
return Response{}, err | |||||
} | |||||
postData = b | |||||
} | |||||
reader := bytes.NewReader(postData) | |||||
client := http.DefaultClient | |||||
newReq, err := http.NewRequest(req.method, req.Url(), reader) | |||||
if err != nil { | |||||
return Response{}, err | |||||
} | |||||
if req.method == "POST" || req.method == "PUT" { | |||||
newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded") | |||||
} | |||||
resp, err := client.Do(newReq) | |||||
if err != nil { | |||||
return Response{}, err | |||||
} | |||||
defer resp.Body.Close() | |||||
body, err := ioutil.ReadAll(resp.Body) | |||||
if err != nil { | |||||
return Response{}, err | |||||
} | |||||
if resp.StatusCode > 201 && resp.StatusCode < 400 { | |||||
return Response{}, errors.New(string(body)) | |||||
} | |||||
esResp := new(Response) | |||||
err = json.Unmarshal(body, &esResp) | |||||
if err != nil { | |||||
return Response{}, err | |||||
} | |||||
if esResp.Error != "" { | |||||
return Response{}, &SearchError{esResp.Error, esResp.Status} | |||||
} | |||||
return *esResp, nil | |||||
} | |||||
// Url builds a Request for a URL | |||||
func (r *Request) Url() string { | |||||
path := "/" + strings.Join(r.IndexList, ",") | |||||
if len(r.TypeList) > 0 { | |||||
path += "/" + strings.Join(r.TypeList, ",") | |||||
} | |||||
// XXX : for indexing documents using the normal (non bulk) API | |||||
if len(r.api) == 0 && len(r.id) > 0 { | |||||
path += "/" + r.id | |||||
} | |||||
path += "/" + r.api | |||||
u := url.URL{ | |||||
Scheme: "http", | |||||
Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port), | |||||
Path: path, | |||||
RawQuery: r.ExtraArgs.Encode(), | |||||
} | |||||
return u.String() | |||||
} |
@@ -0,0 +1,553 @@ | |||||
// Copyright 2013 Belogik. All rights reserved. | |||||
// Use of this source code is governed by a BSD-style | |||||
// license that can be found in the LICENSE file. | |||||
package goes | |||||
import ( | |||||
. "launchpad.net/gocheck" | |||||
"net/url" | |||||
"os" | |||||
"testing" | |||||
"time" | |||||
) | |||||
var ( | |||||
ES_HOST = "localhost" | |||||
ES_PORT = "9200" | |||||
) | |||||
// Hook up gocheck into the gotest runner. | |||||
func Test(t *testing.T) { TestingT(t) } | |||||
type GoesTestSuite struct{} | |||||
var _ = Suite(&GoesTestSuite{}) | |||||
func (s *GoesTestSuite) SetUpTest(c *C) { | |||||
h := os.Getenv("TEST_ELASTICSEARCH_HOST") | |||||
if h != "" { | |||||
ES_HOST = h | |||||
} | |||||
p := os.Getenv("TEST_ELASTICSEARCH_PORT") | |||||
if p != "" { | |||||
ES_PORT = p | |||||
} | |||||
} | |||||
func (s *GoesTestSuite) TestNewConnection(c *C) { | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
c.Assert(conn, DeepEquals, &Connection{ES_HOST, ES_PORT}) | |||||
} | |||||
func (s *GoesTestSuite) TestUrl(c *C) { | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
r := Request{ | |||||
Conn: conn, | |||||
Query: "q", | |||||
IndexList: []string{"i"}, | |||||
TypeList: []string{}, | |||||
method: "GET", | |||||
api: "_search", | |||||
} | |||||
c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/i/_search") | |||||
r.IndexList = []string{"a", "b"} | |||||
c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/_search") | |||||
r.TypeList = []string{"c", "d"} | |||||
c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/_search") | |||||
r.ExtraArgs = make(url.Values, 1) | |||||
r.ExtraArgs.Set("version", "1") | |||||
c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/_search?version=1") | |||||
r.id = "1234" | |||||
r.api = "" | |||||
c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/1234/?version=1") | |||||
} | |||||
func (s *GoesTestSuite) TestEsDown(c *C) { | |||||
conn := NewConnection("a.b.c.d", "1234") | |||||
var query = map[string]interface{}{"query": "foo"} | |||||
r := Request{ | |||||
Conn: conn, | |||||
Query: query, | |||||
IndexList: []string{"i"}, | |||||
method: "GET", | |||||
api: "_search", | |||||
} | |||||
_, err := r.Run() | |||||
c.Assert(err.Error(), Equals, "Get http://a.b.c.d:1234/i/_search: lookup a.b.c.d: no such host") | |||||
} | |||||
func (s *GoesTestSuite) TestRunMissingIndex(c *C) { | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
var query = map[string]interface{}{"query": "foo"} | |||||
r := Request{ | |||||
Conn: conn, | |||||
Query: query, | |||||
IndexList: []string{"i"}, | |||||
method: "GET", | |||||
api: "_search", | |||||
} | |||||
_, err := r.Run() | |||||
c.Assert(err.Error(), Equals, "[404] IndexMissingException[[i] missing]") | |||||
} | |||||
func (s *GoesTestSuite) TestCreateIndex(c *C) { | |||||
indexName := "testcreateindexgoes" | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
defer conn.DeleteIndex(indexName) | |||||
mapping := map[string]interface{}{ | |||||
"settings": map[string]interface{}{ | |||||
"index.number_of_shards": 1, | |||||
"index.number_of_replicas": 0, | |||||
}, | |||||
"mappings": map[string]interface{}{ | |||||
"_default_": map[string]interface{}{ | |||||
"_source": map[string]interface{}{ | |||||
"enabled": false, | |||||
}, | |||||
"_all": map[string]interface{}{ | |||||
"enabled": false, | |||||
}, | |||||
}, | |||||
}, | |||||
} | |||||
resp, err := conn.CreateIndex(indexName, mapping) | |||||
c.Assert(err, IsNil) | |||||
c.Assert(resp.Ok, Equals, true) | |||||
c.Assert(resp.Acknowledged, Equals, true) | |||||
} | |||||
func (s *GoesTestSuite) TestDeleteIndexInexistantIndex(c *C) { | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
resp, err := conn.DeleteIndex("foobar") | |||||
c.Assert(err.Error(), Equals, "[404] IndexMissingException[[foobar] missing]") | |||||
c.Assert(resp, DeepEquals, Response{}) | |||||
} | |||||
func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) { | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
indexName := "testdeleteindexexistingindex" | |||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{}) | |||||
c.Assert(err, IsNil) | |||||
resp, err := conn.DeleteIndex(indexName) | |||||
c.Assert(err, IsNil) | |||||
expectedResponse := Response{} | |||||
expectedResponse.Ok = true | |||||
expectedResponse.Acknowledged = true | |||||
c.Assert(resp, DeepEquals, expectedResponse) | |||||
} | |||||
func (s *GoesTestSuite) TestRefreshIndex(c *C) { | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
indexName := "testrefreshindex" | |||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{}) | |||||
c.Assert(err, IsNil) | |||||
resp, err := conn.RefreshIndex(indexName) | |||||
c.Assert(err, IsNil) | |||||
c.Assert(resp.Ok, Equals, true) | |||||
_, err = conn.DeleteIndex(indexName) | |||||
c.Assert(err, IsNil) | |||||
} | |||||
func (s *GoesTestSuite) TestBulkSend(c *C) { | |||||
indexName := "testbulkadd" | |||||
docType := "tweet" | |||||
tweets := []Document{ | |||||
Document{ | |||||
Id: "123", | |||||
Index: nil, | |||||
Type: docType, | |||||
BulkCommand: BULK_COMMAND_INDEX, | |||||
Fields: map[string]interface{}{ | |||||
"user": "foo", | |||||
"message": "some foo message", | |||||
}, | |||||
}, | |||||
Document{ | |||||
Id: nil, | |||||
Index: indexName, | |||||
Type: docType, | |||||
BulkCommand: BULK_COMMAND_INDEX, | |||||
Fields: map[string]interface{}{ | |||||
"user": "bar", | |||||
"message": "some bar message", | |||||
}, | |||||
}, | |||||
} | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
_, err := conn.CreateIndex(indexName, nil) | |||||
c.Assert(err, IsNil) | |||||
response, err := conn.BulkSend(indexName, tweets) | |||||
i := Item{ | |||||
Ok: true, | |||||
Id: "123", | |||||
Type: docType, | |||||
Version: 1, | |||||
Index: indexName, | |||||
} | |||||
c.Assert(response.Items[0][BULK_COMMAND_INDEX], Equals, i) | |||||
c.Assert(err, IsNil) | |||||
_, err = conn.RefreshIndex(indexName) | |||||
c.Assert(err, IsNil) | |||||
var query = map[string]interface{}{ | |||||
"query": map[string]interface{}{ | |||||
"match_all": map[string]interface{}{}, | |||||
}, | |||||
} | |||||
searchResults, err := conn.Search(query, []string{indexName}, []string{}) | |||||
c.Assert(err, IsNil) | |||||
var expectedTotal uint64 = 2 | |||||
c.Assert(searchResults.Hits.Total, Equals, expectedTotal) | |||||
extraDocId := "" | |||||
checked := 0 | |||||
for _, hit := range searchResults.Hits.Hits { | |||||
if hit.Source["user"] == "foo" { | |||||
c.Assert(hit.Id, Equals, "123") | |||||
checked++ | |||||
} | |||||
if hit.Source["user"] == "bar" { | |||||
c.Assert(len(hit.Id) > 0, Equals, true) | |||||
extraDocId = hit.Id | |||||
checked++ | |||||
} | |||||
} | |||||
c.Assert(checked, Equals, 2) | |||||
docToDelete := []Document{ | |||||
Document{ | |||||
Id: "123", | |||||
Index: indexName, | |||||
Type: docType, | |||||
BulkCommand: BULK_COMMAND_DELETE, | |||||
}, | |||||
Document{ | |||||
Id: extraDocId, | |||||
Index: indexName, | |||||
Type: docType, | |||||
BulkCommand: BULK_COMMAND_DELETE, | |||||
}, | |||||
} | |||||
response, err = conn.BulkSend(indexName, docToDelete) | |||||
i = Item{ | |||||
Ok: true, | |||||
Id: "123", | |||||
Type: docType, | |||||
Version: 2, | |||||
Index: indexName, | |||||
} | |||||
c.Assert(response.Items[0][BULK_COMMAND_DELETE], Equals, i) | |||||
c.Assert(err, IsNil) | |||||
_, err = conn.RefreshIndex(indexName) | |||||
c.Assert(err, IsNil) | |||||
searchResults, err = conn.Search(query, []string{indexName}, []string{}) | |||||
c.Assert(err, IsNil) | |||||
expectedTotal = 0 | |||||
c.Assert(searchResults.Hits.Total, Equals, expectedTotal) | |||||
_, err = conn.DeleteIndex(indexName) | |||||
c.Assert(err, IsNil) | |||||
} | |||||
func (s *GoesTestSuite) TestFetchStats(c *C) { | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
indexName := "testfetchstats" | |||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{}) | |||||
c.Assert(err, IsNil) | |||||
// we must wait for a bit otherwise ES crashes | |||||
time.Sleep(1 * time.Second) | |||||
response, err := conn.FetchStats() | |||||
c.Assert(err, IsNil) | |||||
c.Assert(response.All.Indices[indexName].Primaries["docs"].Count, Equals, 0) | |||||
_, err = conn.DeleteIndex(indexName) | |||||
c.Assert(err, IsNil) | |||||
} | |||||
func (s *GoesTestSuite) TestIndexIdDefined(c *C) { | |||||
indexName := "testindexiddefined" | |||||
docType := "tweet" | |||||
docId := "1234" | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
// just in case | |||||
conn.DeleteIndex(indexName) | |||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{}) | |||||
c.Assert(err, IsNil) | |||||
defer conn.DeleteIndex(indexName) | |||||
d := Document{ | |||||
Index: indexName, | |||||
Type: docType, | |||||
Id: docId, | |||||
Fields: map[string]interface{}{ | |||||
"user": "foo", | |||||
"message": "bar", | |||||
}, | |||||
} | |||||
extraArgs := make(url.Values, 1) | |||||
extraArgs.Set("ttl", "86400000") | |||||
response, err := conn.Index(d, extraArgs) | |||||
c.Assert(err, IsNil) | |||||
expectedResponse := Response{ | |||||
Ok: true, | |||||
Index: indexName, | |||||
Id: docId, | |||||
Type: docType, | |||||
Version: 1, | |||||
} | |||||
c.Assert(response, DeepEquals, expectedResponse) | |||||
} | |||||
func (s *GoesTestSuite) TestIndexIdNotDefined(c *C) { | |||||
indexName := "testindexidnotdefined" | |||||
docType := "tweet" | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
// just in case | |||||
conn.DeleteIndex(indexName) | |||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{}) | |||||
c.Assert(err, IsNil) | |||||
defer conn.DeleteIndex(indexName) | |||||
d := Document{ | |||||
Index: indexName, | |||||
Type: docType, | |||||
Fields: map[string]interface{}{ | |||||
"user": "foo", | |||||
"message": "bar", | |||||
}, | |||||
} | |||||
response, err := conn.Index(d, url.Values{}) | |||||
c.Assert(err, IsNil) | |||||
c.Assert(response.Ok, Equals, true) | |||||
c.Assert(response.Index, Equals, indexName) | |||||
c.Assert(response.Type, Equals, docType) | |||||
c.Assert(response.Version, Equals, 1) | |||||
c.Assert(response.Id != "", Equals, true) | |||||
} | |||||
func (s *GoesTestSuite) TestDelete(c *C) { | |||||
indexName := "testdelete" | |||||
docType := "tweet" | |||||
docId := "1234" | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
// just in case | |||||
conn.DeleteIndex(indexName) | |||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{}) | |||||
c.Assert(err, IsNil) | |||||
defer conn.DeleteIndex(indexName) | |||||
d := Document{ | |||||
Index: indexName, | |||||
Type: docType, | |||||
Id: docId, | |||||
Fields: map[string]interface{}{ | |||||
"user": "foo", | |||||
}, | |||||
} | |||||
_, err = conn.Index(d, url.Values{}) | |||||
c.Assert(err, IsNil) | |||||
response, err := conn.Delete(d, url.Values{}) | |||||
c.Assert(err, IsNil) | |||||
expectedResponse := Response{ | |||||
Ok: true, | |||||
Found: true, | |||||
Index: indexName, | |||||
Type: docType, | |||||
Id: docId, | |||||
// XXX : even after a DELETE the version number seems to be incremented | |||||
Version: 2, | |||||
} | |||||
c.Assert(response, DeepEquals, expectedResponse) | |||||
response, err = conn.Delete(d, url.Values{}) | |||||
c.Assert(err, IsNil) | |||||
expectedResponse = Response{ | |||||
Ok: true, | |||||
Found: false, | |||||
Index: indexName, | |||||
Type: docType, | |||||
Id: docId, | |||||
// XXX : even after a DELETE the version number seems to be incremented | |||||
Version: 3, | |||||
} | |||||
c.Assert(response, DeepEquals, expectedResponse) | |||||
} | |||||
func (s *GoesTestSuite) TestGet(c *C) { | |||||
indexName := "testget" | |||||
docType := "tweet" | |||||
docId := "111" | |||||
source := map[string]interface{}{ | |||||
"f1": "foo", | |||||
"f2": "foo", | |||||
} | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
conn.DeleteIndex(indexName) | |||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{}) | |||||
c.Assert(err, IsNil) | |||||
defer conn.DeleteIndex(indexName) | |||||
d := Document{ | |||||
Index: indexName, | |||||
Type: docType, | |||||
Id: docId, | |||||
Fields: source, | |||||
} | |||||
_, err = conn.Index(d, url.Values{}) | |||||
c.Assert(err, IsNil) | |||||
response, err := conn.Get(indexName, docType, docId, url.Values{}) | |||||
c.Assert(err, IsNil) | |||||
expectedResponse := Response{ | |||||
Index: indexName, | |||||
Type: docType, | |||||
Id: docId, | |||||
Version: 1, | |||||
Exists: true, | |||||
Source: source, | |||||
} | |||||
c.Assert(response, DeepEquals, expectedResponse) | |||||
fields := make(url.Values, 1) | |||||
fields.Set("fields", "f1") | |||||
response, err = conn.Get(indexName, docType, docId, fields) | |||||
c.Assert(err, IsNil) | |||||
expectedResponse = Response{ | |||||
Index: indexName, | |||||
Type: docType, | |||||
Id: docId, | |||||
Version: 1, | |||||
Exists: true, | |||||
Fields: map[string]interface{}{ | |||||
"f1": "foo", | |||||
}, | |||||
} | |||||
c.Assert(response, DeepEquals, expectedResponse) | |||||
} | |||||
func (s *GoesTestSuite) TestSearch(c *C) { | |||||
indexName := "testsearch" | |||||
docType := "tweet" | |||||
docId := "1234" | |||||
source := map[string]interface{}{ | |||||
"user": "foo", | |||||
"message": "bar", | |||||
} | |||||
conn := NewConnection(ES_HOST, ES_PORT) | |||||
conn.DeleteIndex(indexName) | |||||
_, err := conn.CreateIndex(indexName, map[string]interface{}{}) | |||||
c.Assert(err, IsNil) | |||||
//defer conn.DeleteIndex(indexName) | |||||
d := Document{ | |||||
Index: indexName, | |||||
Type: docType, | |||||
Id: docId, | |||||
Fields: source, | |||||
} | |||||
_, err = conn.Index(d, url.Values{}) | |||||
c.Assert(err, IsNil) | |||||
_, err = conn.RefreshIndex(indexName) | |||||
c.Assert(err, IsNil) | |||||
// I can feel my eyes bleeding | |||||
query := map[string]interface{}{ | |||||
"query": map[string]interface{}{ | |||||
"bool": map[string]interface{}{ | |||||
"must": []map[string]interface{}{ | |||||
map[string]interface{}{ | |||||
"match_all": map[string]interface{}{}, | |||||
}, | |||||
}, | |||||
}, | |||||
}, | |||||
} | |||||
response, err := conn.Search(query, []string{indexName}, []string{docType}) | |||||
expectedHits := Hits{ | |||||
Total: 1, | |||||
MaxScore: 1.0, | |||||
Hits: []Hit{ | |||||
Hit{ | |||||
Index: indexName, | |||||
Type: docType, | |||||
Id: docId, | |||||
Score: 1.0, | |||||
Source: source, | |||||
}, | |||||
}, | |||||
} | |||||
c.Assert(response.Hits, DeepEquals, expectedHits) | |||||
} |
@@ -0,0 +1,142 @@ | |||||
// Copyright 2013 Belogik. All rights reserved. | |||||
// Use of this source code is governed by a BSD-style | |||||
// license that can be found in the LICENSE file. | |||||
package goes | |||||
import ( | |||||
"net/url" | |||||
) | |||||
// Represents a Connection object to elasticsearch | |||||
type Connection struct { | |||||
// The host to connect to | |||||
Host string | |||||
// The port to use | |||||
Port string | |||||
} | |||||
// Represents a Request to elasticsearch | |||||
type Request struct { | |||||
// Which connection will be used | |||||
Conn *Connection | |||||
// A search query | |||||
Query interface{} | |||||
// Which index to search into | |||||
IndexList []string | |||||
// Which type to search into | |||||
TypeList []string | |||||
// HTTP Method to user (GET, POST ...) | |||||
method string | |||||
// Which api keyword (_search, _bulk, etc) to use | |||||
api string | |||||
// Bulk data | |||||
bulkData []byte | |||||
// A list of extra URL arguments | |||||
ExtraArgs url.Values | |||||
// Used for the id field when indexing a document | |||||
id string | |||||
} | |||||
// Represents a search Response from elasticsearch | |||||
type Response struct { | |||||
Ok bool | |||||
Acknowledged bool | |||||
Error string | |||||
Status uint64 | |||||
Took uint64 | |||||
TimedOut bool `json:"timed_out"` | |||||
Shards Shard `json:"_shards"` | |||||
Hits Hits | |||||
Index string `json:"_index"` | |||||
Id string `json:"_id"` | |||||
Type string `json:"_type"` | |||||
Version int `json:"_version"` | |||||
Found bool | |||||
// Used by the _stats API | |||||
All All `json:"_all"` | |||||
// Used by the _bulk API | |||||
Items []map[string]Item `json:"items,omitempty"` | |||||
// Used by the GET API | |||||
Exists bool | |||||
Source map[string]interface{} `json:"_source"` | |||||
Fields map[string]interface{} `json:"fields"` | |||||
} | |||||
// Represents a document to send to elasticsearch | |||||
type Document struct { | |||||
// XXX : interface as we can support nil values | |||||
Index interface{} | |||||
Type string | |||||
Id interface{} | |||||
BulkCommand string | |||||
Fields map[string]interface{} | |||||
} | |||||
// Represents the "items" field in a _bulk response | |||||
type Item struct { | |||||
Ok bool `json:"ok"` | |||||
Type string `json:"_type"` | |||||
Id string `json:"_id"` | |||||
Index string `json:"_index"` | |||||
Version int `json:"_version"` | |||||
} | |||||
// Represents the "_all" field when calling the _stats API | |||||
// This is minimal but this is what I only need | |||||
type All struct { | |||||
Indices map[string]StatIndex `json:"indices"` | |||||
Primaries map[string]StatPrimary `json:"primaries"` | |||||
} | |||||
type StatIndex struct { | |||||
Primaries map[string]StatPrimary `json:"primaries"` | |||||
} | |||||
type StatPrimary struct { | |||||
// primary/docs: | |||||
Count int | |||||
Deleted int | |||||
} | |||||
// Represents the "shard" struct as returned by elasticsearch | |||||
type Shard struct { | |||||
Total uint64 | |||||
Successful uint64 | |||||
Failed uint64 | |||||
} | |||||
// Represent a hit returned by a search | |||||
type Hit struct { | |||||
Index string `json:"_index"` | |||||
Type string `json:"_type"` | |||||
Id string `json:"_id"` | |||||
Score float64 `json:"_score"` | |||||
Source map[string]interface{} `json:"_source"` | |||||
Fields map[string]interface{} `json:"fields"` | |||||
} | |||||
// Represent the hits structure as returned by elasticsearch | |||||
type Hits struct { | |||||
Total uint64 | |||||
// max_score may contain the "null" value | |||||
MaxScore interface{} `json:"max_score"` | |||||
Hits []Hit | |||||
} | |||||
type SearchError struct { | |||||
Msg string | |||||
StatusCode uint64 | |||||
} |