From a018ac0716fa7ad0eb8037512a53cc6823d4405a Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 13:56:13 -0500 Subject: [PATCH 01/14] Rename Connection to Client --- example_test.go | 6 ++-- goes.go | 59 ++++++++++++++++++++------------------- goes_test.go | 74 ++++++++++++++++++++++++------------------------- structs.go | 28 +++++++++---------- 4 files changed, 84 insertions(+), 83 deletions(-) diff --git a/example_test.go b/example_test.go index d049ac1..afb0bab 100644 --- a/example_test.go +++ b/example_test.go @@ -19,7 +19,7 @@ var ( ES_PORT = "9200" ) -func getConnection() (conn *goes.Connection) { +func getConnection() (conn *goes.Client) { h := os.Getenv("TEST_ELASTICSEARCH_HOST") if h == "" { h = ES_HOST @@ -30,7 +30,7 @@ func getConnection() (conn *goes.Connection) { p = ES_PORT } - conn = goes.NewConnection(h, p) + conn = goes.NewClient(h, p) return } @@ -177,7 +177,7 @@ func ExampleConnectionOverrideHttpClient() { Transport: tr, } conn := getConnection() - conn.WithClient(cl) + conn.WithHTTPClient(cl) fmt.Printf("%v\n", conn.Client) } diff --git a/goes.go b/goes.go index 107ce6a..afcb74b 100644 --- a/goes.go +++ b/goes.go @@ -27,21 +27,22 @@ func (err *SearchError) Error() string { return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg) } -// NewConnection initiates a new Connection to an elasticsearch server +// NewClient initiates a new client for an elasticsearch server // // This function is pretty useless for now but might be useful in a near future // if wee need more features like connection pooling or load balancing. -func NewConnection(host string, port string) *Connection { - return &Connection{host, port, http.DefaultClient} +func NewClient(host string, port string) *Client { + return &Client{host, port, http.DefaultClient} } -func (c *Connection) WithClient(cl *http.Client) *Connection { +// WithHTTPClient sets the http.Client to be used with the connection. Returns the original client. +func (c *Client) WithHTTPClient(cl *http.Client) *Client { c.Client = cl return c } // CreateIndex creates a new index represented by a name and a mapping -func (c *Connection) CreateIndex(name string, mapping interface{}) (*Response, error) { +func (c *Client) CreateIndex(name string, mapping interface{}) (*Response, error) { r := Request{ Conn: c, Query: mapping, @@ -53,7 +54,7 @@ func (c *Connection) CreateIndex(name string, mapping interface{}) (*Response, e } // DeleteIndex deletes an index represented by a name -func (c *Connection) DeleteIndex(name string) (*Response, error) { +func (c *Client) DeleteIndex(name string) (*Response, error) { r := Request{ Conn: c, IndexList: []string{name}, @@ -64,7 +65,7 @@ func (c *Connection) DeleteIndex(name string) (*Response, error) { } // RefreshIndex refreshes an index represented by a name -func (c *Connection) RefreshIndex(name string) (*Response, error) { +func (c *Client) RefreshIndex(name string) (*Response, error) { r := Request{ Conn: c, IndexList: []string{name}, @@ -77,7 +78,7 @@ func (c *Connection) RefreshIndex(name string) (*Response, error) { // UpdateIndexSettings updates settings for existing index represented by a name and a settings // as described here: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html -func (c *Connection) UpdateIndexSettings(name string, settings interface{}) (*Response, error) { +func (c *Client) UpdateIndexSettings(name string, settings interface{}) (*Response, error) { r := Request{ Conn: c, Query: settings, @@ -91,7 +92,7 @@ func (c *Connection) UpdateIndexSettings(name string, settings interface{}) (*Re // Optimize an index represented by a name, extra args are also allowed please check: // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize -func (c *Connection) Optimize(indexList []string, extraArgs url.Values) (*Response, error) { +func (c *Client) Optimize(indexList []string, extraArgs url.Values) (*Response, error) { r := Request{ Conn: c, IndexList: indexList, @@ -104,7 +105,7 @@ func (c *Connection) Optimize(indexList []string, extraArgs url.Values) (*Respon } // Stats fetches statistics (_stats) for the current elasticsearch server -func (c *Connection) Stats(indexList []string, extraArgs url.Values) (*Response, error) { +func (c *Client) Stats(indexList []string, extraArgs url.Values) (*Response, error) { r := Request{ Conn: c, IndexList: indexList, @@ -118,7 +119,7 @@ func (c *Connection) Stats(indexList []string, extraArgs url.Values) (*Response, // IndexStatus fetches the status (_status) for the indices defined in // indexList. Use _all in indexList to get stats for all indices -func (c *Connection) IndexStatus(indexList []string) (*Response, error) { +func (c *Client) IndexStatus(indexList []string) (*Response, error) { r := Request{ Conn: c, IndexList: indexList, @@ -130,7 +131,7 @@ func (c *Connection) IndexStatus(indexList []string) (*Response, error) { } // Bulk adds multiple documents in bulk mode -func (c *Connection) BulkSend(documents []Document) (*Response, error) { +func (c *Client) BulkSend(documents []Document) (*Response, error) { // We do not generate a traditional JSON here (often a one liner) // Elasticsearch expects one line of JSON per line (EOL = \n) // plus an extra \n at the very end of the document @@ -210,7 +211,7 @@ func (c *Connection) BulkSend(documents []Document) (*Response, error) { } // Search executes a search query against an index -func (c *Connection) Search(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { +func (c *Client) Search(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { r := Request{ Conn: c, Query: query, @@ -225,7 +226,7 @@ func (c *Connection) Search(query interface{}, indexList []string, typeList []st } // Count executes a count query against an index, use the Count field in the response for the result -func (c *Connection) Count(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { +func (c *Client) Count(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { r := Request{ Conn: c, Query: query, @@ -242,7 +243,7 @@ func (c *Connection) Count(query interface{}, indexList []string, typeList []str //Query runs a query against an index using the provided http method. //This method can be used to execute a delete by query, just pass in "DELETE" //for the HTTP method. -func (c *Connection) Query(query interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (*Response, error) { +func (c *Client) Query(query interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (*Response, error) { r := Request{ Conn: c, Query: query, @@ -257,7 +258,7 @@ func (c *Connection) Query(query interface{}, indexList []string, typeList []str } // Scan starts scroll over an index -func (c *Connection) Scan(query interface{}, indexList []string, typeList []string, timeout string, size int) (*Response, error) { +func (c *Client) Scan(query interface{}, indexList []string, typeList []string, timeout string, size int) (*Response, error) { v := url.Values{} v.Add("search_type", "scan") v.Add("scroll", timeout) @@ -277,7 +278,7 @@ func (c *Connection) Scan(query interface{}, indexList []string, typeList []stri } // Scroll fetches data by scroll id -func (c *Connection) Scroll(scrollId string, timeout string) (*Response, error) { +func (c *Client) Scroll(scrollId string, timeout string) (*Response, error) { v := url.Values{} v.Add("scroll", timeout) @@ -293,7 +294,7 @@ func (c *Connection) Scroll(scrollId string, timeout string) (*Response, error) } // Get a typed document by its id -func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (*Response, error) { +func (c *Client) Get(index string, documentType string, id string, extraArgs url.Values) (*Response, error) { r := Request{ Conn: c, IndexList: []string{index}, @@ -308,7 +309,7 @@ func (c *Connection) Get(index string, documentType string, id string, extraArgs // Index indexes a Document // The extraArgs is a list of url.Values that you can send to elasticsearch as // URL arguments, for example, to control routing, ttl, version, op_type, etc. -func (c *Connection) Index(d Document, extraArgs url.Values) (*Response, error) { +func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { r := Request{ Conn: c, Query: d.Fields, @@ -329,7 +330,7 @@ func (c *Connection) Index(d Document, extraArgs url.Values) (*Response, error) // Delete deletes a Document d // The extraArgs is a list of url.Values that you can send to elasticsearch as // URL arguments, for example, to control routing. -func (c *Connection) Delete(d Document, extraArgs url.Values) (*Response, error) { +func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) { r := Request{ Conn: c, IndexList: []string{d.Index.(string)}, @@ -484,7 +485,7 @@ func (b Bucket) Aggregation(name string) Aggregation { } // PutMapping registers a specific mapping for one or more types in one or more indexes -func (c *Connection) PutMapping(typeName string, mapping interface{}, indexes []string) (*Response, error) { +func (c *Client) PutMapping(typeName string, mapping interface{}, indexes []string) (*Response, error) { r := Request{ Conn: c, @@ -497,7 +498,7 @@ func (c *Connection) PutMapping(typeName string, mapping interface{}, indexes [] return r.Run() } -func (c *Connection) GetMapping(types []string, indexes []string) (*Response, error) { +func (c *Client) GetMapping(types []string, indexes []string) (*Response, error) { r := Request{ Conn: c, @@ -510,7 +511,7 @@ func (c *Connection) GetMapping(types []string, indexes []string) (*Response, er } // IndicesExist checks whether index (or indices) exist on the server -func (c *Connection) IndicesExist(indexes []string) (bool, error) { +func (c *Client) IndicesExist(indexes []string) (bool, error) { r := Request{ Conn: c, @@ -523,7 +524,7 @@ func (c *Connection) IndicesExist(indexes []string) (bool, error) { return resp.Status == 200, err } -func (c *Connection) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) { +func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) { r := Request{ Conn: c, Query: query, @@ -542,7 +543,7 @@ func (c *Connection) Update(d Document, query interface{}, extraArgs url.Values) } // DeleteMapping deletes a mapping along with all data in the type -func (c *Connection) DeleteMapping(typeName string, indexes []string) (*Response, error) { +func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, error) { r := Request{ Conn: c, @@ -554,7 +555,7 @@ func (c *Connection) DeleteMapping(typeName string, indexes []string) (*Response return r.Run() } -func (c *Connection) modifyAlias(action string, alias string, indexes []string) (*Response, error) { +func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Response, error) { command := map[string]interface{}{ "actions": make([]map[string]interface{}, 1), } @@ -579,17 +580,17 @@ func (c *Connection) modifyAlias(action string, alias string, indexes []string) } // AddAlias creates an alias to one or more indexes -func (c *Connection) AddAlias(alias string, indexes []string) (*Response, error) { +func (c *Client) AddAlias(alias string, indexes []string) (*Response, error) { return c.modifyAlias("add", alias, indexes) } // RemoveAlias removes an alias to one or more indexes -func (c *Connection) RemoveAlias(alias string, indexes []string) (*Response, error) { +func (c *Client) RemoveAlias(alias string, indexes []string) (*Response, error) { return c.modifyAlias("remove", alias, indexes) } // AliasExists checks whether alias is defined on the server -func (c *Connection) AliasExists(alias string) (bool, error) { +func (c *Client) AliasExists(alias string) (bool, error) { r := Request{ Conn: c, diff --git a/goes_test.go b/goes_test.go index a69fcb1..7484ba2 100644 --- a/goes_test.go +++ b/goes_test.go @@ -39,12 +39,12 @@ func (s *GoesTestSuite) SetUpTest(c *C) { } } -func (s *GoesTestSuite) TestNewConnection(c *C) { - conn := NewConnection(ES_HOST, ES_PORT) - c.Assert(conn, DeepEquals, &Connection{ES_HOST, ES_PORT, http.DefaultClient}) +func (s *GoesTestSuite) TestNewClient(c *C) { + conn := NewClient(ES_HOST, ES_PORT) + c.Assert(conn, DeepEquals, &Client{ES_HOST, ES_PORT, http.DefaultClient}) } -func (s *GoesTestSuite) TestWithClient(c *C) { +func (s *GoesTestSuite) TestWithHTTPClient(c *C) { tr := &http.Transport{ DisableCompression: true, ResponseHeaderTimeout: 1 * time.Second, @@ -52,15 +52,15 @@ func (s *GoesTestSuite) TestWithClient(c *C) { cl := &http.Client{ Transport: tr, } - conn := NewConnection(ES_HOST, ES_PORT).WithClient(cl) + conn := NewClient(ES_HOST, ES_PORT).WithHTTPClient(cl) - c.Assert(conn, DeepEquals, &Connection{ES_HOST, ES_PORT, cl}) + c.Assert(conn, DeepEquals, &Client{ES_HOST, ES_PORT, cl}) c.Assert(conn.Client.Transport.(*http.Transport).DisableCompression, Equals, true) c.Assert(conn.Client.Transport.(*http.Transport).ResponseHeaderTimeout, Equals, 1*time.Second) } func (s *GoesTestSuite) TestUrl(c *C) { - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) r := Request{ Conn: conn, @@ -89,7 +89,7 @@ func (s *GoesTestSuite) TestUrl(c *C) { } func (s *GoesTestSuite) TestEsDown(c *C) { - conn := NewConnection("a.b.c.d", "1234") + conn := NewClient("a.b.c.d", "1234") var query = map[string]interface{}{"query": "foo"} @@ -106,7 +106,7 @@ func (s *GoesTestSuite) TestEsDown(c *C) { } func (s *GoesTestSuite) TestRunMissingIndex(c *C) { - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) var query = map[string]interface{}{"query": "foo"} @@ -125,7 +125,7 @@ func (s *GoesTestSuite) TestRunMissingIndex(c *C) { func (s *GoesTestSuite) TestCreateIndex(c *C) { indexName := "testcreateindexgoes" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) defer conn.DeleteIndex(indexName) mapping := map[string]interface{}{ @@ -152,7 +152,7 @@ func (s *GoesTestSuite) TestCreateIndex(c *C) { } func (s *GoesTestSuite) TestDeleteIndexInexistantIndex(c *C) { - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) resp, err := conn.DeleteIndex("foobar") c.Assert(err.Error(), Equals, "[404] IndexMissingException[[foobar] missing]") @@ -161,7 +161,7 @@ func (s *GoesTestSuite) TestDeleteIndexInexistantIndex(c *C) { } func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) { - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) indexName := "testdeleteindexexistingindex" @@ -181,7 +181,7 @@ func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) { } func (s *GoesTestSuite) TestUpdateIndexSettings(c *C) { - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) indexName := "testupdateindex" _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -199,7 +199,7 @@ func (s *GoesTestSuite) TestUpdateIndexSettings(c *C) { } func (s *GoesTestSuite) TestRefreshIndex(c *C) { - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) indexName := "testrefreshindex" _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -213,7 +213,7 @@ func (s *GoesTestSuite) TestRefreshIndex(c *C) { } func (s *GoesTestSuite) TestOptimize(c *C) { - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) indexName := "testoptimize" conn.DeleteIndex(indexName) @@ -260,7 +260,7 @@ func (s *GoesTestSuite) TestBulkSend(c *C) { }, } - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, nil) @@ -349,7 +349,7 @@ func (s *GoesTestSuite) TestBulkSend(c *C) { } func (s *GoesTestSuite) TestStats(c *C) { - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) indexName := "teststats" conn.DeleteIndex(indexName) @@ -373,7 +373,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) { docType := "tweet" docId := "1234" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indexName) @@ -416,7 +416,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsNotInMapOrStruct(c *C) { docType := "tweet" docId := "1234" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indexName) @@ -442,7 +442,7 @@ func (s *GoesTestSuite) TestIndexIdDefined(c *C) { docType := "tweet" docId := "1234" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indexName) @@ -481,7 +481,7 @@ func (s *GoesTestSuite) TestIndexIdNotDefined(c *C) { indexName := "testindexidnotdefined" docType := "tweet" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indexName) @@ -512,7 +512,7 @@ func (s *GoesTestSuite) TestDelete(c *C) { docType := "tweet" docId := "1234" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indexName) @@ -568,7 +568,7 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { docType := "tweet" docId := "1234" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indexName) @@ -638,7 +638,7 @@ func (s *GoesTestSuite) TestGet(c *C) { "f2": "foo", } - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -701,7 +701,7 @@ func (s *GoesTestSuite) TestSearch(c *C) { "message": "bar", } - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -761,7 +761,7 @@ func (s *GoesTestSuite) TestCount(c *C) { "message": "bar", } - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -800,7 +800,7 @@ func (s *GoesTestSuite) TestCount(c *C) { func (s *GoesTestSuite) TestIndexStatus(c *C) { indexName := "testindexstatus" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) conn.DeleteIndex(indexName) mapping := map[string]interface{}{ @@ -909,7 +909,7 @@ func (s *GoesTestSuite) TestScroll(c *C) { }, } - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) mapping := map[string]interface{}{ "settings": map[string]interface{}{ @@ -1011,7 +1011,7 @@ func (s *GoesTestSuite) TestAggregations(c *C) { }, } - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) mapping := map[string]interface{}{ "settings": map[string]interface{}{ @@ -1084,7 +1084,7 @@ func (s *GoesTestSuite) TestPutMapping(c *C) { indexName := "testputmapping" docType := "tweet" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indexName) @@ -1125,7 +1125,7 @@ func (s *GoesTestSuite) TestPutMapping(c *C) { func (s *GoesTestSuite) TestIndicesExist(c *C) { indices := []string{"testindicesexist"} - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indices[0]) @@ -1150,7 +1150,7 @@ func (s *GoesTestSuite) TestUpdate(c *C) { docType := "tweet" docId := "1234" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indexName) @@ -1231,7 +1231,7 @@ func (s *GoesTestSuite) TestGetMapping(c *C) { indexName := "testmapping" docType := "tweet" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indexName) @@ -1267,7 +1267,7 @@ func (s *GoesTestSuite) TestDeleteMapping(c *C) { indexName := "testdeletemapping" docType := "tweet" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(indexName) @@ -1319,7 +1319,7 @@ func (s *GoesTestSuite) TestAddAlias(c *C) { "message": "bar", } - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) defer conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -1369,7 +1369,7 @@ func (s *GoesTestSuite) TestRemoveAlias(c *C) { "message": "bar", } - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) defer conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -1404,7 +1404,7 @@ func (s *GoesTestSuite) TestAliasExists(c *C) { index := "testaliasexist_1" alias := "testaliasexists" - conn := NewConnection(ES_HOST, ES_PORT) + conn := NewClient(ES_HOST, ES_PORT) // just in case conn.DeleteIndex(index) diff --git a/structs.go b/structs.go index c726461..0b1d68c 100644 --- a/structs.go +++ b/structs.go @@ -9,8 +9,8 @@ import ( "net/url" ) -// Represents a Connection object to elasticsearch -type Connection struct { +// Client represents a connection to elasticsearch +type Client struct { // The host to connect to Host string @@ -22,10 +22,10 @@ type Connection struct { Client *http.Client } -// Represents a Request to elasticsearch +// Request holds a single request to elasticsearch type Request struct { // Which connection will be used - Conn *Connection + Conn *Client // A search query Query interface{} @@ -55,7 +55,7 @@ type Request struct { id string } -// Represents a Response from elasticsearch +// Response holds an elasticsearch response type Response struct { Acknowledged bool Error string @@ -93,13 +93,13 @@ type Response struct { Raw map[string]interface{} } -// Represents an aggregation from response +// Aggregation holds the aggregation portion of an ES response type Aggregation map[string]interface{} -// Represents a bucket for aggregation +// Bucket represents a bucket for aggregation type Bucket map[string]interface{} -// Represents a document to send to elasticsearch +// Document holds a document to send to elasticsearch type Document struct { // XXX : interface as we can support nil values Index interface{} @@ -109,7 +109,7 @@ type Document struct { Fields interface{} } -// Represents the "items" field in a _bulk response +// Item holds an item from the "items" field in a _bulk response type Item struct { Type string `json:"_type"` Id string `json:"_id"` @@ -119,7 +119,7 @@ type Item struct { Status uint64 `json:"status"` } -// Represents the "_all" field when calling the _stats API +// All represents the "_all" field when calling the _stats API // This is minimal but this is what I only need type All struct { Indices map[string]StatIndex `json:"indices"` @@ -136,14 +136,14 @@ type StatPrimary struct { Deleted int } -// Represents the "shard" struct as returned by elasticsearch +// Shard holds the "shard" struct as returned by elasticsearch type Shard struct { Total uint64 Successful uint64 Failed uint64 } -// Represent a hit returned by a search +// Hit holds a hit returned by a search type Hit struct { Index string `json:"_index"` Type string `json:"_type"` @@ -154,7 +154,7 @@ type Hit struct { Fields map[string]interface{} `json:"fields"` } -// Represent the hits structure as returned by elasticsearch +// Hits holds the hits structure as returned by elasticsearch type Hits struct { Total uint64 // max_score may contain the "null" value @@ -167,7 +167,7 @@ type SearchError struct { StatusCode uint64 } -// Represent the status for a given index for the _status command +// IndexStatus holds the status for a given index for the _status command type IndexStatus struct { // XXX : problem, int will be marshaled to a float64 which seems logical // XXX : is it better to use strings even for int values or to keep From 3d78bba218636b1a3d61e5f3222f777b47a8924a Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 14:03:39 -0500 Subject: [PATCH 02/14] Add go 1.7, drop older versions of ES --- .travis.yml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5af1b53..2b494e8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,14 +3,12 @@ language: go go: - 1.5.4 - 1.6.3 + - 1.7.1 env: global: - GO15VENDOREXPERIMENT=1 matrix: - - ES_VERSION=1.0.3 GROOVY_VER=2.0.0 - - ES_VERSION=1.1.2 GROOVY_VER=2.0.0 - - ES_VERSION=1.2.1 GROOVY_VER=2.2.0 - ES_VERSION=1.3.4 - ES_VERSION=1.4.4 - ES_VERSION=1.5.2 @@ -22,7 +20,6 @@ before_script: - wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz - tar -xzf elasticsearch-${ES_VERSION}.tar.gz -C ${HOME}/elasticsearch - "echo 'script.groovy.sandbox.enabled: true' >> ${HOME}/elasticsearch/elasticsearch-${ES_VERSION}/config/elasticsearch.yml" - - 'if [[ "${ES_VERSION}" < "1.3" ]]; then ${HOME}/elasticsearch/elasticsearch-${ES_VERSION}/bin/plugin --install elasticsearch/elasticsearch-lang-groovy/${GROOVY_VER}; fi' - ${HOME}/elasticsearch/elasticsearch-${ES_VERSION}/bin/elasticsearch >/dev/null & - sleep 10 # Wait for ES to start up From 9f7a8396bb5b4b0e8a4b8dd1fc9eecc5f0309e92 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 14:19:07 -0500 Subject: [PATCH 03/14] Update code to match Go style guide --- example_test.go | 32 +++---- goes.go | 51 +++++----- goes_test.go | 250 ++++++++++++++++++++++++------------------------ structs.go | 13 ++- 4 files changed, 176 insertions(+), 170 deletions(-) diff --git a/example_test.go b/example_test.go index afb0bab..93c916c 100644 --- a/example_test.go +++ b/example_test.go @@ -19,7 +19,7 @@ var ( ES_PORT = "9200" ) -func getConnection() (conn *goes.Client) { +func getClient() (conn *goes.Client) { h := os.Getenv("TEST_ELASTICSEARCH_HOST") if h == "" { h = ES_HOST @@ -35,8 +35,8 @@ func getConnection() (conn *goes.Client) { return } -func ExampleConnection_CreateIndex() { - conn := getConnection() +func ExampleClient_CreateIndex() { + conn := getClient() mapping := map[string]interface{}{ "settings": map[string]interface{}{ @@ -64,8 +64,8 @@ func ExampleConnection_CreateIndex() { fmt.Printf("%s", resp) } -func ExampleConnection_DeleteIndex() { - conn := getConnection() +func ExampleClient_DeleteIndex() { + conn := getClient() resp, err := conn.DeleteIndex("yourinde") if err != nil { @@ -75,8 +75,8 @@ func ExampleConnection_DeleteIndex() { fmt.Printf("%s", resp) } -func ExampleConnection_RefreshIndex() { - conn := getConnection() +func ExampleClient_RefreshIndex() { + conn := getClient() resp, err := conn.RefreshIndex("yourindex") if err != nil { @@ -86,8 +86,8 @@ func ExampleConnection_RefreshIndex() { fmt.Printf("%s", resp) } -func ExampleConnection_Search() { - conn := getConnection() +func ExampleClient_Search() { + conn := getClient() var query = map[string]interface{}{ "query": map[string]interface{}{ @@ -123,8 +123,8 @@ func ExampleConnection_Search() { fmt.Printf("%s", searchResults) } -func ExampleConnection_Index() { - conn := getConnection() +func ExampleClient_Index() { + conn := getClient() d := goes.Document{ Index: "twitter", @@ -147,15 +147,15 @@ func ExampleConnection_Index() { fmt.Printf("%s", response) } -func ExampleConnection_Delete() { - conn := getConnection() +func ExampleClient_Delete() { + conn := getClient() //[create index, index document ...] d := goes.Document{ Index: "twitter", Type: "tweet", - Id: "1", + ID: "1", Fields: map[string]interface{}{ "user": "foo", }, @@ -169,14 +169,14 @@ func ExampleConnection_Delete() { fmt.Printf("%s", response) } -func ExampleConnectionOverrideHttpClient() { +func ExampleClient_WithHTTPClient() { tr := &http.Transport{ ResponseHeaderTimeout: 1 * time.Second, } cl := &http.Client{ Transport: tr, } - conn := getConnection() + conn := getClient() conn.WithHTTPClient(cl) fmt.Printf("%v\n", conn.Client) diff --git a/goes.go b/goes.go index afcb74b..aa09e48 100644 --- a/goes.go +++ b/goes.go @@ -19,8 +19,10 @@ import ( ) const ( - BULK_COMMAND_INDEX = "index" - BULK_COMMAND_DELETE = "delete" + // BulkCommandIndex specifies a bulk doc should be indexed + BulkCommandIndex = "index" + // BulkCommandDelete specifies a bulk doc should be deleted + BulkCommandDelete = "delete" ) func (err *SearchError) Error() string { @@ -130,7 +132,7 @@ func (c *Client) IndexStatus(indexList []string) (*Response, error) { return r.Run() } -// Bulk adds multiple documents in bulk mode +// BulkSend bulk adds multiple documents in bulk mode func (c *Client) BulkSend(documents []Document) (*Response, error) { // We do not generate a traditional JSON here (often a one liner) // Elasticsearch expects one line of JSON per line (EOL = \n) @@ -158,7 +160,7 @@ func (c *Client) BulkSend(documents []Document) (*Response, error) { doc.BulkCommand: map[string]interface{}{ "_index": doc.Index, "_type": doc.Type, - "_id": doc.Id, + "_id": doc.ID, }, }) @@ -278,7 +280,7 @@ func (c *Client) Scan(query interface{}, indexList []string, typeList []string, } // Scroll fetches data by scroll id -func (c *Client) Scroll(scrollId string, timeout string) (*Response, error) { +func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) { v := url.Values{} v.Add("scroll", timeout) @@ -287,7 +289,7 @@ func (c *Client) Scroll(scrollId string, timeout string) (*Response, error) { method: "POST", api: "_search/scroll", ExtraArgs: v, - Body: []byte(scrollId), + Body: []byte(scrollID), } return r.Run() @@ -319,9 +321,9 @@ func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { method: "POST", } - if d.Id != nil { + if d.ID != nil { r.method = "PUT" - r.id = d.Id.(string) + r.id = d.ID.(string) } return r.Run() @@ -337,7 +339,7 @@ func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) { TypeList: []string{d.Type}, ExtraArgs: extraArgs, method: "DELETE", - id: d.Id.(string), + id: d.ID.(string), } return r.Run() @@ -400,7 +402,7 @@ func (req *Request) run() ([]byte, uint64, error) { reader := bytes.NewReader(postData) - newReq, err := http.NewRequest(req.method, req.Url(), reader) + newReq, err := http.NewRequest(req.method, req.URL(), reader) if err != nil { return nil, 0, err } @@ -428,26 +430,26 @@ func (req *Request) run() ([]byte, uint64, error) { return body, uint64(resp.StatusCode), nil } -// Url builds a Request for a URL -func (r *Request) Url() string { - path := "/" + strings.Join(r.IndexList, ",") +// URL builds a Request for a URL +func (req *Request) URL() string { + path := "/" + strings.Join(req.IndexList, ",") - if len(r.TypeList) > 0 { - path += "/" + strings.Join(r.TypeList, ",") + if len(req.TypeList) > 0 { + path += "/" + strings.Join(req.TypeList, ",") } // XXX : for indexing documents using the normal (non bulk) API - if len(r.id) > 0 { - path += "/" + r.id + if len(req.id) > 0 { + path += "/" + req.id } - path += "/" + r.api + path += "/" + req.api u := url.URL{ Scheme: "http", - Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port), + Host: fmt.Sprintf("%s:%s", req.Conn.Host, req.Conn.Port), Path: path, - RawQuery: r.ExtraArgs.Encode(), + RawQuery: req.ExtraArgs.Encode(), } return u.String() @@ -479,9 +481,8 @@ func (b Bucket) DocCount() uint64 { func (b Bucket) Aggregation(name string) Aggregation { if agg, ok := b[name]; ok { return agg.(map[string]interface{}) - } else { - return Aggregation{} } + return Aggregation{} } // PutMapping registers a specific mapping for one or more types in one or more indexes @@ -498,6 +499,7 @@ func (c *Client) PutMapping(typeName string, mapping interface{}, indexes []stri return r.Run() } +// GetMapping returns the mappings for the specified types func (c *Client) GetMapping(types []string, indexes []string) (*Response, error) { r := Request{ @@ -524,6 +526,7 @@ func (c *Client) IndicesExist(indexes []string) (bool, error) { return resp.Status == 200, err } +// Update updates the specified document using the _update endpoint func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) { r := Request{ Conn: c, @@ -535,8 +538,8 @@ func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*R api: "_update", } - if d.Id != nil { - r.id = d.Id.(string) + if d.ID != nil { + r.id = d.ID.(string) } return r.Run() diff --git a/goes_test.go b/goes_test.go index 7484ba2..25b77af 100644 --- a/goes_test.go +++ b/goes_test.go @@ -16,8 +16,8 @@ import ( ) var ( - ES_HOST = "localhost" - ES_PORT = "9200" + ESHost = "localhost" + ESPort = "9200" ) // Hook up gocheck into the gotest runner. @@ -30,18 +30,18 @@ var _ = Suite(&GoesTestSuite{}) func (s *GoesTestSuite) SetUpTest(c *C) { h := os.Getenv("TEST_ELASTICSEARCH_HOST") if h != "" { - ES_HOST = h + ESHost = h } p := os.Getenv("TEST_ELASTICSEARCH_PORT") if p != "" { - ES_PORT = p + ESPort = p } } func (s *GoesTestSuite) TestNewClient(c *C) { - conn := NewClient(ES_HOST, ES_PORT) - c.Assert(conn, DeepEquals, &Client{ES_HOST, ES_PORT, http.DefaultClient}) + conn := NewClient(ESHost, ESPort) + c.Assert(conn, DeepEquals, &Client{ESHost, ESPort, http.DefaultClient}) } func (s *GoesTestSuite) TestWithHTTPClient(c *C) { @@ -52,15 +52,15 @@ func (s *GoesTestSuite) TestWithHTTPClient(c *C) { cl := &http.Client{ Transport: tr, } - conn := NewClient(ES_HOST, ES_PORT).WithHTTPClient(cl) + conn := NewClient(ESHost, ESPort).WithHTTPClient(cl) - c.Assert(conn, DeepEquals, &Client{ES_HOST, ES_PORT, cl}) + c.Assert(conn, DeepEquals, &Client{ESHost, ESPort, cl}) c.Assert(conn.Client.Transport.(*http.Transport).DisableCompression, Equals, true) c.Assert(conn.Client.Transport.(*http.Transport).ResponseHeaderTimeout, Equals, 1*time.Second) } func (s *GoesTestSuite) TestUrl(c *C) { - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) r := Request{ Conn: conn, @@ -71,21 +71,21 @@ func (s *GoesTestSuite) TestUrl(c *C) { api: "_search", } - c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/i/_search") + c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/i/_search") r.IndexList = []string{"a", "b"} - c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/_search") + c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/_search") r.TypeList = []string{"c", "d"} - c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/_search") + c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/_search") r.ExtraArgs = make(url.Values, 1) r.ExtraArgs.Set("version", "1") - c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/_search?version=1") + c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/_search?version=1") r.id = "1234" r.api = "" - c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/1234/?version=1") + c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/1234/?version=1") } func (s *GoesTestSuite) TestEsDown(c *C) { @@ -106,7 +106,7 @@ func (s *GoesTestSuite) TestEsDown(c *C) { } func (s *GoesTestSuite) TestRunMissingIndex(c *C) { - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) var query = map[string]interface{}{"query": "foo"} @@ -125,7 +125,7 @@ func (s *GoesTestSuite) TestRunMissingIndex(c *C) { func (s *GoesTestSuite) TestCreateIndex(c *C) { indexName := "testcreateindexgoes" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) defer conn.DeleteIndex(indexName) mapping := map[string]interface{}{ @@ -152,7 +152,7 @@ func (s *GoesTestSuite) TestCreateIndex(c *C) { } func (s *GoesTestSuite) TestDeleteIndexInexistantIndex(c *C) { - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) resp, err := conn.DeleteIndex("foobar") c.Assert(err.Error(), Equals, "[404] IndexMissingException[[foobar] missing]") @@ -161,7 +161,7 @@ func (s *GoesTestSuite) TestDeleteIndexInexistantIndex(c *C) { } func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) { - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) indexName := "testdeleteindexexistingindex" @@ -181,7 +181,7 @@ func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) { } func (s *GoesTestSuite) TestUpdateIndexSettings(c *C) { - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) indexName := "testupdateindex" _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -199,7 +199,7 @@ func (s *GoesTestSuite) TestUpdateIndexSettings(c *C) { } func (s *GoesTestSuite) TestRefreshIndex(c *C) { - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) indexName := "testrefreshindex" _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -213,7 +213,7 @@ func (s *GoesTestSuite) TestRefreshIndex(c *C) { } func (s *GoesTestSuite) TestOptimize(c *C) { - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) indexName := "testoptimize" conn.DeleteIndex(indexName) @@ -238,10 +238,10 @@ func (s *GoesTestSuite) TestBulkSend(c *C) { tweets := []Document{ { - Id: "123", + ID: "123", Index: indexName, Type: docType, - BulkCommand: BULK_COMMAND_INDEX, + BulkCommand: BulkCommandIndex, Fields: map[string]interface{}{ "user": "foo", "message": "some foo message", @@ -249,10 +249,10 @@ func (s *GoesTestSuite) TestBulkSend(c *C) { }, { - Id: nil, + ID: nil, Index: indexName, Type: docType, - BulkCommand: BULK_COMMAND_INDEX, + BulkCommand: BulkCommandIndex, Fields: map[string]interface{}{ "user": "bar", "message": "some bar message", @@ -260,7 +260,7 @@ func (s *GoesTestSuite) TestBulkSend(c *C) { }, } - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, nil) @@ -268,13 +268,13 @@ func (s *GoesTestSuite) TestBulkSend(c *C) { response, err := conn.BulkSend(tweets) i := Item{ - Id: "123", + ID: "123", Type: docType, Version: 1, Index: indexName, Status: 201, //201 for indexing ( https://issues.apache.org/jira/browse/CONNECTORS-634 ) } - c.Assert(response.Items[0][BULK_COMMAND_INDEX], Equals, i) + c.Assert(response.Items[0][BulkCommandIndex], Equals, i) c.Assert(err, IsNil) _, err = conn.RefreshIndex(indexName) @@ -292,17 +292,17 @@ func (s *GoesTestSuite) TestBulkSend(c *C) { var expectedTotal uint64 = 2 c.Assert(searchResults.Hits.Total, Equals, expectedTotal) - extraDocId := "" + extraDocID := "" checked := 0 for _, hit := range searchResults.Hits.Hits { if hit.Source["user"] == "foo" { - c.Assert(hit.Id, Equals, "123") + c.Assert(hit.ID, Equals, "123") checked++ } if hit.Source["user"] == "bar" { - c.Assert(len(hit.Id) > 0, Equals, true) - extraDocId = hit.Id + c.Assert(len(hit.ID) > 0, Equals, true) + extraDocID = hit.ID checked++ } } @@ -310,28 +310,28 @@ func (s *GoesTestSuite) TestBulkSend(c *C) { docToDelete := []Document{ { - Id: "123", + ID: "123", Index: indexName, Type: docType, - BulkCommand: BULK_COMMAND_DELETE, + BulkCommand: BulkCommandDelete, }, { - Id: extraDocId, + ID: extraDocID, Index: indexName, Type: docType, - BulkCommand: BULK_COMMAND_DELETE, + BulkCommand: BulkCommandDelete, }, } response, err = conn.BulkSend(docToDelete) i = Item{ - Id: "123", + ID: "123", Type: docType, Version: 2, Index: indexName, Status: 200, //200 for updates } - c.Assert(response.Items[0][BULK_COMMAND_DELETE], Equals, i) + c.Assert(response.Items[0][BulkCommandDelete], Equals, i) c.Assert(err, IsNil) @@ -349,7 +349,7 @@ func (s *GoesTestSuite) TestBulkSend(c *C) { } func (s *GoesTestSuite) TestStats(c *C) { - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) indexName := "teststats" conn.DeleteIndex(indexName) @@ -371,9 +371,9 @@ func (s *GoesTestSuite) TestStats(c *C) { func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) { indexName := "testindexwithfieldsinstruct" docType := "tweet" - docId := "1234" + docID := "1234" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indexName) @@ -384,7 +384,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: struct { user string message string @@ -402,7 +402,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) { expectedResponse := &Response{ Status: 201, Index: indexName, - Id: docId, + ID: docID, Type: docType, Version: 1, } @@ -414,9 +414,9 @@ func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) { func (s *GoesTestSuite) TestIndexWithFieldsNotInMapOrStruct(c *C) { indexName := "testindexwithfieldsnotinmaporstruct" docType := "tweet" - docId := "1234" + docID := "1234" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indexName) @@ -427,7 +427,7 @@ func (s *GoesTestSuite) TestIndexWithFieldsNotInMapOrStruct(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: "test", } @@ -440,9 +440,9 @@ func (s *GoesTestSuite) TestIndexWithFieldsNotInMapOrStruct(c *C) { func (s *GoesTestSuite) TestIndexIdDefined(c *C) { indexName := "testindexiddefined" docType := "tweet" - docId := "1234" + docID := "1234" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indexName) @@ -453,7 +453,7 @@ func (s *GoesTestSuite) TestIndexIdDefined(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: map[string]interface{}{ "user": "foo", "message": "bar", @@ -468,7 +468,7 @@ func (s *GoesTestSuite) TestIndexIdDefined(c *C) { expectedResponse := &Response{ Status: 201, Index: indexName, - Id: docId, + ID: docID, Type: docType, Version: 1, } @@ -481,7 +481,7 @@ func (s *GoesTestSuite) TestIndexIdNotDefined(c *C) { indexName := "testindexidnotdefined" docType := "tweet" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indexName) @@ -504,15 +504,15 @@ func (s *GoesTestSuite) TestIndexIdNotDefined(c *C) { c.Assert(response.Index, Equals, indexName) c.Assert(response.Type, Equals, docType) c.Assert(response.Version, Equals, 1) - c.Assert(response.Id != "", Equals, true) + c.Assert(response.ID != "", Equals, true) } func (s *GoesTestSuite) TestDelete(c *C) { indexName := "testdelete" docType := "tweet" - docId := "1234" + docID := "1234" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indexName) @@ -523,7 +523,7 @@ func (s *GoesTestSuite) TestDelete(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: map[string]interface{}{ "user": "foo", }, @@ -540,7 +540,7 @@ func (s *GoesTestSuite) TestDelete(c *C) { Found: true, Index: indexName, Type: docType, - Id: docId, + ID: docID, // XXX : even after a DELETE the version number seems to be incremented Version: 2, } @@ -555,7 +555,7 @@ func (s *GoesTestSuite) TestDelete(c *C) { Found: false, Index: indexName, Type: docType, - Id: docId, + ID: docID, // XXX : even after a DELETE the version number seems to be incremented Version: 3, } @@ -566,9 +566,9 @@ func (s *GoesTestSuite) TestDelete(c *C) { func (s *GoesTestSuite) TestDeleteByQuery(c *C) { indexName := "testdeletebyquery" docType := "tweet" - docId := "1234" + docID := "1234" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indexName) @@ -579,7 +579,7 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: map[string]interface{}{ "user": "foo", }, @@ -617,7 +617,7 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { Found: false, Index: "", Type: "", - Id: "", + ID: "", Version: 0, } response.Raw = nil @@ -632,13 +632,13 @@ func (s *GoesTestSuite) TestDeleteByQuery(c *C) { func (s *GoesTestSuite) TestGet(c *C) { indexName := "testget" docType := "tweet" - docId := "111" + docID := "111" source := map[string]interface{}{ "f1": "foo", "f2": "foo", } - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -648,21 +648,21 @@ func (s *GoesTestSuite) TestGet(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: source, } _, err = conn.Index(d, url.Values{}) c.Assert(err, IsNil) - response, err := conn.Get(indexName, docType, docId, url.Values{}) + response, err := conn.Get(indexName, docType, docID, url.Values{}) c.Assert(err, IsNil) expectedResponse := &Response{ Status: 200, Index: indexName, Type: docType, - Id: docId, + ID: docID, Version: 1, Found: true, Source: source, @@ -673,14 +673,14 @@ func (s *GoesTestSuite) TestGet(c *C) { fields := make(url.Values, 1) fields.Set("fields", "f1") - response, err = conn.Get(indexName, docType, docId, fields) + response, err = conn.Get(indexName, docType, docID, fields) c.Assert(err, IsNil) expectedResponse = &Response{ Status: 200, Index: indexName, Type: docType, - Id: docId, + ID: docID, Version: 1, Found: true, Fields: map[string]interface{}{ @@ -695,13 +695,13 @@ func (s *GoesTestSuite) TestGet(c *C) { func (s *GoesTestSuite) TestSearch(c *C) { indexName := "testsearch" docType := "tweet" - docId := "1234" + docID := "1234" source := map[string]interface{}{ "user": "foo", "message": "bar", } - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -711,7 +711,7 @@ func (s *GoesTestSuite) TestSearch(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: source, } @@ -733,7 +733,7 @@ func (s *GoesTestSuite) TestSearch(c *C) { }, }, } - response, err := conn.Search(query, []string{indexName}, []string{docType}, url.Values{}) + response, _ := conn.Search(query, []string{indexName}, []string{docType}, url.Values{}) expectedHits := Hits{ Total: 1, @@ -742,7 +742,7 @@ func (s *GoesTestSuite) TestSearch(c *C) { { Index: indexName, Type: docType, - Id: docId, + ID: docID, Score: 1.0, Source: source, }, @@ -755,13 +755,13 @@ func (s *GoesTestSuite) TestSearch(c *C) { func (s *GoesTestSuite) TestCount(c *C) { indexName := "testcount" docType := "tweet" - docId := "1234" + docID := "1234" source := map[string]interface{}{ "user": "foo", "message": "bar", } - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -771,7 +771,7 @@ func (s *GoesTestSuite) TestCount(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: source, } @@ -793,14 +793,14 @@ func (s *GoesTestSuite) TestCount(c *C) { }, }, } - response, err := conn.Count(query, []string{indexName}, []string{docType}, url.Values{}) + response, _ := conn.Count(query, []string{indexName}, []string{docType}, url.Values{}) c.Assert(response.Count, Equals, 1) } func (s *GoesTestSuite) TestIndexStatus(c *C) { indexName := "testindexstatus" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) conn.DeleteIndex(indexName) mapping := map[string]interface{}{ @@ -876,10 +876,10 @@ func (s *GoesTestSuite) TestScroll(c *C) { tweets := []Document{ { - Id: nil, + ID: nil, Index: indexName, Type: docType, - BulkCommand: BULK_COMMAND_INDEX, + BulkCommand: BulkCommandIndex, Fields: map[string]interface{}{ "user": "foo", "message": "some foo message", @@ -887,10 +887,10 @@ func (s *GoesTestSuite) TestScroll(c *C) { }, { - Id: nil, + ID: nil, Index: indexName, Type: docType, - BulkCommand: BULK_COMMAND_INDEX, + BulkCommand: BulkCommandIndex, Fields: map[string]interface{}{ "user": "bar", "message": "some bar message", @@ -898,10 +898,10 @@ func (s *GoesTestSuite) TestScroll(c *C) { }, { - Id: nil, + ID: nil, Index: indexName, Type: docType, - BulkCommand: BULK_COMMAND_INDEX, + BulkCommand: BulkCommandIndex, Fields: map[string]interface{}{ "user": "foo", "message": "another foo message", @@ -909,7 +909,7 @@ func (s *GoesTestSuite) TestScroll(c *C) { }, } - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) mapping := map[string]interface{}{ "settings": map[string]interface{}{ @@ -943,30 +943,30 @@ func (s *GoesTestSuite) TestScroll(c *C) { scan, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1) c.Assert(err, IsNil) - c.Assert(len(scan.ScrollId) > 0, Equals, true) + c.Assert(len(scan.ScrollID) > 0, Equals, true) - searchResults, err := conn.Scroll(scan.ScrollId, "1m") + searchResults, err := conn.Scroll(scan.ScrollID, "1m") c.Assert(err, IsNil) // some data in first chunk c.Assert(searchResults.Hits.Total, Equals, uint64(2)) - c.Assert(len(searchResults.ScrollId) > 0, Equals, true) + c.Assert(len(searchResults.ScrollID) > 0, Equals, true) c.Assert(len(searchResults.Hits.Hits), Equals, 1) - searchResults, err = conn.Scroll(searchResults.ScrollId, "1m") + searchResults, err = conn.Scroll(searchResults.ScrollID, "1m") c.Assert(err, IsNil) // more data in second chunk c.Assert(searchResults.Hits.Total, Equals, uint64(2)) - c.Assert(len(searchResults.ScrollId) > 0, Equals, true) + c.Assert(len(searchResults.ScrollID) > 0, Equals, true) c.Assert(len(searchResults.Hits.Hits), Equals, 1) - searchResults, err = conn.Scroll(searchResults.ScrollId, "1m") + searchResults, err = conn.Scroll(searchResults.ScrollID, "1m") c.Assert(err, IsNil) // nothing in third chunk c.Assert(searchResults.Hits.Total, Equals, uint64(2)) - c.Assert(len(searchResults.ScrollId) > 0, Equals, true) + c.Assert(len(searchResults.ScrollID) > 0, Equals, true) c.Assert(len(searchResults.Hits.Hits), Equals, 0) } @@ -976,10 +976,10 @@ func (s *GoesTestSuite) TestAggregations(c *C) { tweets := []Document{ { - Id: nil, + ID: nil, Index: indexName, Type: docType, - BulkCommand: BULK_COMMAND_INDEX, + BulkCommand: BulkCommandIndex, Fields: map[string]interface{}{ "user": "foo", "message": "some foo message", @@ -988,10 +988,10 @@ func (s *GoesTestSuite) TestAggregations(c *C) { }, { - Id: nil, + ID: nil, Index: indexName, Type: docType, - BulkCommand: BULK_COMMAND_INDEX, + BulkCommand: BulkCommandIndex, Fields: map[string]interface{}{ "user": "bar", "message": "some bar message", @@ -1000,10 +1000,10 @@ func (s *GoesTestSuite) TestAggregations(c *C) { }, { - Id: nil, + ID: nil, Index: indexName, Type: docType, - BulkCommand: BULK_COMMAND_INDEX, + BulkCommand: BulkCommandIndex, Fields: map[string]interface{}{ "user": "foo", "message": "another foo message", @@ -1011,7 +1011,7 @@ func (s *GoesTestSuite) TestAggregations(c *C) { }, } - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) mapping := map[string]interface{}{ "settings": map[string]interface{}{ @@ -1056,7 +1056,7 @@ func (s *GoesTestSuite) TestAggregations(c *C) { }, } - resp, err := conn.Search(query, []string{indexName}, []string{docType}, url.Values{}) + resp, _ := conn.Search(query, []string{indexName}, []string{docType}, url.Values{}) user, ok := resp.Aggregations["user"] c.Assert(ok, Equals, true) @@ -1084,7 +1084,7 @@ func (s *GoesTestSuite) TestPutMapping(c *C) { indexName := "testputmapping" docType := "tweet" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indexName) @@ -1125,7 +1125,7 @@ func (s *GoesTestSuite) TestPutMapping(c *C) { func (s *GoesTestSuite) TestIndicesExist(c *C) { indices := []string{"testindicesexist"} - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indices[0]) @@ -1137,20 +1137,20 @@ func (s *GoesTestSuite) TestIndicesExist(c *C) { defer conn.DeleteIndex(indices[0]) time.Sleep(200 * time.Millisecond) - exists, err = conn.IndicesExist(indices) + exists, _ = conn.IndicesExist(indices) c.Assert(exists, Equals, true) indices = append(indices, "nonexistent") - exists, err = conn.IndicesExist(indices) + exists, _ = conn.IndicesExist(indices) c.Assert(exists, Equals, false) } func (s *GoesTestSuite) TestUpdate(c *C) { indexName := "testupdate" docType := "tweet" - docId := "1234" + docID := "1234" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indexName) @@ -1161,7 +1161,7 @@ func (s *GoesTestSuite) TestUpdate(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: map[string]interface{}{ "user": "foo", "message": "bar", @@ -1177,7 +1177,7 @@ func (s *GoesTestSuite) TestUpdate(c *C) { expectedResponse := &Response{ Status: 201, Index: indexName, - Id: docId, + ID: docID, Type: docType, Version: 1, } @@ -1208,20 +1208,20 @@ func (s *GoesTestSuite) TestUpdate(c *C) { c.Assert(err, Equals, nil) - response, err = conn.Get(indexName, docType, docId, url.Values{}) + response, err = conn.Get(indexName, docType, docID, url.Values{}) c.Assert(err, Equals, nil) c.Assert(response.Source["counter"], Equals, float64(6)) c.Assert(response.Source["user"], Equals, "foo") c.Assert(response.Source["message"], Equals, "bar") // Test another document, non-existent - docId = "555" - d.Id = docId + docID = "555" + d.ID = docID response, err = conn.Update(d, query, extraArgs) c.Assert(err, Equals, nil) time.Sleep(200 * time.Millisecond) - response, err = conn.Get(indexName, docType, docId, url.Values{}) + response, err = conn.Get(indexName, docType, docID, url.Values{}) c.Assert(err, Equals, nil) c.Assert(response.Source["user"], Equals, "admin") c.Assert(response.Source["message"], Equals, "candybar") @@ -1231,7 +1231,7 @@ func (s *GoesTestSuite) TestGetMapping(c *C) { indexName := "testmapping" docType := "tweet" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indexName) @@ -1267,7 +1267,7 @@ func (s *GoesTestSuite) TestDeleteMapping(c *C) { indexName := "testdeletemapping" docType := "tweet" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(indexName) @@ -1313,13 +1313,13 @@ func (s *GoesTestSuite) TestAddAlias(c *C) { aliasName := "testAlias" indexName := "testalias_1" docType := "testDoc" - docId := "1234" + docID := "1234" source := map[string]interface{}{ "user": "foo", "message": "bar", } - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) defer conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -1329,7 +1329,7 @@ func (s *GoesTestSuite) TestAddAlias(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: source, } @@ -1342,14 +1342,14 @@ func (s *GoesTestSuite) TestAddAlias(c *C) { c.Assert(err, IsNil) // Get document via alias - response, err := conn.Get(aliasName, docType, docId, url.Values{}) + response, err := conn.Get(aliasName, docType, docID, url.Values{}) c.Assert(err, IsNil) expectedResponse := &Response{ Status: 200, Index: indexName, Type: docType, - Id: docId, + ID: docID, Version: 1, Found: true, Source: source, @@ -1363,13 +1363,13 @@ func (s *GoesTestSuite) TestRemoveAlias(c *C) { aliasName := "testAlias" indexName := "testalias_1" docType := "testDoc" - docId := "1234" + docID := "1234" source := map[string]interface{}{ "user": "foo", "message": "bar", } - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) defer conn.DeleteIndex(indexName) _, err := conn.CreateIndex(indexName, map[string]interface{}{}) @@ -1379,7 +1379,7 @@ func (s *GoesTestSuite) TestRemoveAlias(c *C) { d := Document{ Index: indexName, Type: docType, - Id: docId, + ID: docID, Fields: source, } @@ -1396,7 +1396,7 @@ func (s *GoesTestSuite) TestRemoveAlias(c *C) { c.Assert(err, IsNil) // Get document via alias - _, err = conn.Get(aliasName, docType, docId, url.Values{}) + _, err = conn.Get(aliasName, docType, docID, url.Values{}) c.Assert(err.Error(), Equals, "[404] IndexMissingException[["+aliasName+"] missing]") } @@ -1404,7 +1404,7 @@ func (s *GoesTestSuite) TestAliasExists(c *C) { index := "testaliasexist_1" alias := "testaliasexists" - conn := NewClient(ES_HOST, ES_PORT) + conn := NewClient(ESHost, ESPort) // just in case conn.DeleteIndex(index) @@ -1421,6 +1421,6 @@ func (s *GoesTestSuite) TestAliasExists(c *C) { time.Sleep(200 * time.Millisecond) defer conn.RemoveAlias(alias, []string{index}) - exists, err = conn.AliasExists(alias) + exists, _ = conn.AliasExists(alias) c.Assert(exists, Equals, true) } diff --git a/structs.go b/structs.go index 0b1d68c..00fa131 100644 --- a/structs.go +++ b/structs.go @@ -66,7 +66,7 @@ type Response struct { Shards Shard `json:"_shards"` Hits Hits Index string `json:"_index"` - Id string `json:"_id"` + ID string `json:"_id"` Type string `json:"_type"` Version int `json:"_version"` Found bool @@ -86,7 +86,7 @@ type Response struct { Indices map[string]IndexStatus // Scroll id for iteration - ScrollId string `json:"_scroll_id"` + ScrollID string `json:"_scroll_id"` Aggregations map[string]Aggregation `json:"aggregations,omitempty"` @@ -104,7 +104,7 @@ type Document struct { // XXX : interface as we can support nil values Index interface{} Type string - Id interface{} + ID interface{} BulkCommand string Fields interface{} } @@ -112,7 +112,7 @@ type Document struct { // Item holds an item from the "items" field in a _bulk response type Item struct { Type string `json:"_type"` - Id string `json:"_id"` + ID string `json:"_id"` Index string `json:"_index"` Version int `json:"_version"` Error string `json:"error"` @@ -126,10 +126,12 @@ type All struct { Primaries map[string]StatPrimary `json:"primaries"` } +// StatIndex contains stats for a specific index type StatIndex struct { Primaries map[string]StatPrimary `json:"primaries"` } +// StatPrimary contains stats for a primary index type StatPrimary struct { // primary/docs: Count int @@ -147,7 +149,7 @@ type Shard struct { type Hit struct { Index string `json:"_index"` Type string `json:"_type"` - Id string `json:"_id"` + ID string `json:"_id"` Score float64 `json:"_score"` Source map[string]interface{} `json:"_source"` Highlight map[string]interface{} `json:"highlight"` @@ -162,6 +164,7 @@ type Hits struct { Hits []Hit } +// SearchError holds errors returned from an ES search type SearchError struct { Msg string StatusCode uint64 From c79b7c2b3644f53426ce0e44a355ed2d5c7679da Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 14:29:47 -0500 Subject: [PATCH 04/14] Add ginkgo for testing --- Makefile | 8 ++++++-- glide.lock | 10 ++++++++-- glide.yaml | 6 ++++++ goes_suite_test.go | 13 +++++++++++++ 4 files changed, 33 insertions(+), 4 deletions(-) create mode 100644 goes_suite_test.go diff --git a/Makefile b/Makefile index 4de8169..d27ce07 100644 --- a/Makefile +++ b/Makefile @@ -2,9 +2,13 @@ help: @echo "Available targets:" @echo "- test: run tests" @echo "- deps: installs dependencies with glide" + @echo "- watch: watch for changes and re-run tests" deps: - glide up + glide install test: deps - go test -i && go test + ginkgo -race -randomizeAllSpecs -r -skipPackage vendor -progress . + +watch: deps + ginkgo watch -race -randomizeAllSpecs -r -skipPackage vendor -progress -notify . diff --git a/glide.lock b/glide.lock index 21728fd..088757e 100644 --- a/glide.lock +++ b/glide.lock @@ -1,6 +1,12 @@ -hash: 7e39b5bd354c3f0770ca3a9d28a74a0523695d18effa1e540f006b325defcfe7 -updated: 2016-07-26T14:50:32.597335795-05:00 +hash: 02db47097959405b1a7e0e1e583c6fbb11c7236c450264909a4e9ac690ef4d47 +updated: 2016-09-27T11:54:53.427061181-05:00 imports: [] testImports: - name: github.com/go-check/check version: 4f90aeace3a26ad7021961c297b22c42160c7b25 +- name: github.com/onsi/ginkgo + version: 462326b1628e124b23f42e87a8f2750e3c4e2d24 + subpackages: + - ginkgo +- name: github.com/onsi/gomega + version: a78ae492d53aad5a7a232d0d0462c14c400e3ee7 diff --git a/glide.yaml b/glide.yaml index 1942ac4..37a1849 100644 --- a/glide.yaml +++ b/glide.yaml @@ -3,3 +3,9 @@ import: [] testImport: - package: github.com/go-check/check version: v1 +- package: github.com/onsi/ginkgo + version: ^1.2.0 + subpackages: + - ginkgo +- package: github.com/onsi/gomega + version: ^1.0.0 diff --git a/goes_suite_test.go b/goes_suite_test.go new file mode 100644 index 0000000..2740b16 --- /dev/null +++ b/goes_suite_test.go @@ -0,0 +1,13 @@ +package goes_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestGoes(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Goes Suite") +} From 3f0875ed66536671e8e8b14962eb4b4d89dc5262 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 14:30:17 -0500 Subject: [PATCH 05/14] make all request fields exported so custom requests can be done --- goes.go | 110 +++++++++++++++++++++++++-------------------------- goes_test.go | 16 ++++---- structs.go | 8 ++-- 3 files changed, 67 insertions(+), 67 deletions(-) diff --git a/goes.go b/goes.go index aa09e48..61af34f 100644 --- a/goes.go +++ b/goes.go @@ -49,7 +49,7 @@ func (c *Client) CreateIndex(name string, mapping interface{}) (*Response, error Conn: c, Query: mapping, IndexList: []string{name}, - method: "PUT", + Method: "PUT", } return r.Run() @@ -60,7 +60,7 @@ func (c *Client) DeleteIndex(name string) (*Response, error) { r := Request{ Conn: c, IndexList: []string{name}, - method: "DELETE", + Method: "DELETE", } return r.Run() @@ -71,8 +71,8 @@ func (c *Client) RefreshIndex(name string) (*Response, error) { r := Request{ Conn: c, IndexList: []string{name}, - method: "POST", - api: "_refresh", + Method: "POST", + API: "_refresh", } return r.Run() @@ -85,8 +85,8 @@ func (c *Client) UpdateIndexSettings(name string, settings interface{}) (*Respon Conn: c, Query: settings, IndexList: []string{name}, - method: "PUT", - api: "_settings", + Method: "PUT", + API: "_settings", } return r.Run() @@ -99,8 +99,8 @@ func (c *Client) Optimize(indexList []string, extraArgs url.Values) (*Response, Conn: c, IndexList: indexList, ExtraArgs: extraArgs, - method: "POST", - api: "_optimize", + Method: "POST", + API: "_optimize", } return r.Run() @@ -112,8 +112,8 @@ func (c *Client) Stats(indexList []string, extraArgs url.Values) (*Response, err Conn: c, IndexList: indexList, ExtraArgs: extraArgs, - method: "GET", - api: "_stats", + Method: "GET", + API: "_stats", } return r.Run() @@ -125,8 +125,8 @@ func (c *Client) IndexStatus(indexList []string) (*Response, error) { r := Request{ Conn: c, IndexList: indexList, - method: "GET", - api: "_status", + Method: "GET", + API: "_status", } return r.Run() @@ -204,9 +204,9 @@ func (c *Client) BulkSend(documents []Document) (*Response, error) { r := Request{ Conn: c, - method: "POST", - api: "_bulk", - bulkData: bytes.Join(bulkData, []byte("\n")), + Method: "POST", + API: "_bulk", + BulkData: bytes.Join(bulkData, []byte("\n")), } return r.Run() @@ -219,8 +219,8 @@ func (c *Client) Search(query interface{}, indexList []string, typeList []string Query: query, IndexList: indexList, TypeList: typeList, - method: "POST", - api: "_search", + Method: "POST", + API: "_search", ExtraArgs: extraArgs, } @@ -234,8 +234,8 @@ func (c *Client) Count(query interface{}, indexList []string, typeList []string, Query: query, IndexList: indexList, TypeList: typeList, - method: "POST", - api: "_count", + Method: "POST", + API: "_count", ExtraArgs: extraArgs, } @@ -251,8 +251,8 @@ func (c *Client) Query(query interface{}, indexList []string, typeList []string, Query: query, IndexList: indexList, TypeList: typeList, - method: httpMethod, - api: "_query", + Method: httpMethod, + API: "_query", ExtraArgs: extraArgs, } @@ -271,8 +271,8 @@ func (c *Client) Scan(query interface{}, indexList []string, typeList []string, Query: query, IndexList: indexList, TypeList: typeList, - method: "POST", - api: "_search", + Method: "POST", + API: "_search", ExtraArgs: v, } @@ -286,8 +286,8 @@ func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) { r := Request{ Conn: c, - method: "POST", - api: "_search/scroll", + Method: "POST", + API: "_search/scroll", ExtraArgs: v, Body: []byte(scrollID), } @@ -300,8 +300,8 @@ func (c *Client) Get(index string, documentType string, id string, extraArgs url r := Request{ Conn: c, IndexList: []string{index}, - method: "GET", - api: documentType + "/" + id, + Method: "GET", + API: documentType + "/" + id, ExtraArgs: extraArgs, } @@ -318,12 +318,12 @@ func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { IndexList: []string{d.Index.(string)}, TypeList: []string{d.Type}, ExtraArgs: extraArgs, - method: "POST", + Method: "POST", } if d.ID != nil { - r.method = "PUT" - r.id = d.ID.(string) + r.Method = "PUT" + r.ID = d.ID.(string) } return r.Run() @@ -338,8 +338,8 @@ func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) { IndexList: []string{d.Index.(string)}, TypeList: []string{d.Type}, ExtraArgs: extraArgs, - method: "DELETE", - id: d.ID.(string), + Method: "DELETE", + ID: d.ID.(string), } return r.Run() @@ -355,7 +355,7 @@ func (req *Request) Run() (*Response, error) { return esResp, err } - if req.method != "HEAD" { + if req.Method != "HEAD" { err = json.Unmarshal(body, &esResp) if err != nil { return esResp, err @@ -366,7 +366,7 @@ func (req *Request) Run() (*Response, error) { } } - if req.api == "_bulk" && esResp.Errors { + if req.API == "_bulk" && esResp.Errors { for _, item := range esResp.Items { for _, i := range item { if i.Error != "" { @@ -390,8 +390,8 @@ func (req *Request) run() ([]byte, uint64, error) { // XXX : refactor this if len(req.Body) > 0 { postData = req.Body - } else if req.api == "_bulk" { - postData = req.bulkData + } else if req.API == "_bulk" { + postData = req.BulkData } else { b, err := json.Marshal(req.Query) if err != nil { @@ -402,12 +402,12 @@ func (req *Request) run() ([]byte, uint64, error) { reader := bytes.NewReader(postData) - newReq, err := http.NewRequest(req.method, req.URL(), reader) + newReq, err := http.NewRequest(req.Method, req.URL(), reader) if err != nil { return nil, 0, err } - if req.method == "POST" || req.method == "PUT" { + if req.Method == "POST" || req.Method == "PUT" { newReq.Header.Set("Content-Type", "application/json") } @@ -439,11 +439,11 @@ func (req *Request) URL() string { } // XXX : for indexing documents using the normal (non bulk) API - if len(req.id) > 0 { - path += "/" + req.id + if len(req.ID) > 0 { + path += "/" + req.ID } - path += "/" + req.api + path += "/" + req.API u := url.URL{ Scheme: "http", @@ -492,8 +492,8 @@ func (c *Client) PutMapping(typeName string, mapping interface{}, indexes []stri Conn: c, Query: mapping, IndexList: indexes, - method: "PUT", - api: "_mappings/" + typeName, + Method: "PUT", + API: "_mappings/" + typeName, } return r.Run() @@ -505,8 +505,8 @@ func (c *Client) GetMapping(types []string, indexes []string) (*Response, error) r := Request{ Conn: c, IndexList: indexes, - method: "GET", - api: "_mapping/" + strings.Join(types, ","), + Method: "GET", + API: "_mapping/" + strings.Join(types, ","), } return r.Run() @@ -518,7 +518,7 @@ func (c *Client) IndicesExist(indexes []string) (bool, error) { r := Request{ Conn: c, IndexList: indexes, - method: "HEAD", + Method: "HEAD", } resp, err := r.Run() @@ -534,12 +534,12 @@ func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*R IndexList: []string{d.Index.(string)}, TypeList: []string{d.Type}, ExtraArgs: extraArgs, - method: "POST", - api: "_update", + Method: "POST", + API: "_update", } if d.ID != nil { - r.id = d.ID.(string) + r.ID = d.ID.(string) } return r.Run() @@ -551,8 +551,8 @@ func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, er r := Request{ Conn: c, IndexList: indexes, - method: "DELETE", - api: "_mappings/" + typeName, + Method: "DELETE", + API: "_mappings/" + typeName, } return r.Run() @@ -575,8 +575,8 @@ func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Re r := Request{ Conn: c, Query: command, - method: "POST", - api: "_aliases", + Method: "POST", + API: "_aliases", } return r.Run() @@ -597,8 +597,8 @@ func (c *Client) AliasExists(alias string) (bool, error) { r := Request{ Conn: c, - method: "HEAD", - api: "_alias/" + alias, + Method: "HEAD", + API: "_alias/" + alias, } resp, err := r.Run() diff --git a/goes_test.go b/goes_test.go index 25b77af..ddd7731 100644 --- a/goes_test.go +++ b/goes_test.go @@ -67,8 +67,8 @@ func (s *GoesTestSuite) TestUrl(c *C) { Query: "q", IndexList: []string{"i"}, TypeList: []string{}, - method: "GET", - api: "_search", + Method: "GET", + API: "_search", } c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/i/_search") @@ -83,8 +83,8 @@ func (s *GoesTestSuite) TestUrl(c *C) { r.ExtraArgs.Set("version", "1") c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/_search?version=1") - r.id = "1234" - r.api = "" + r.ID = "1234" + r.API = "" c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/1234/?version=1") } @@ -97,8 +97,8 @@ func (s *GoesTestSuite) TestEsDown(c *C) { Conn: conn, Query: query, IndexList: []string{"i"}, - method: "GET", - api: "_search", + Method: "GET", + API: "_search", } _, err := r.Run() @@ -114,8 +114,8 @@ func (s *GoesTestSuite) TestRunMissingIndex(c *C) { Conn: conn, Query: query, IndexList: []string{"i"}, - method: "GET", - api: "_search", + Method: "GET", + API: "_search", } _, err := r.Run() diff --git a/structs.go b/structs.go index 00fa131..6aca112 100644 --- a/structs.go +++ b/structs.go @@ -37,13 +37,13 @@ type Request struct { TypeList []string // HTTP Method to user (GET, POST ...) - method string + Method string // Which api keyword (_search, _bulk, etc) to use - api string + API string // Bulk data - bulkData []byte + BulkData []byte // Request body Body []byte @@ -52,7 +52,7 @@ type Request struct { ExtraArgs url.Values // Used for the id field when indexing a document - id string + ID string } // Response holds an elasticsearch response From 45269de7cccbb30cc82d7264ba6983fbada7a13e Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 14:34:50 -0500 Subject: [PATCH 06/14] Make sure ginkgo gets installed --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index d27ce07..01e9ac8 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ help: @echo "- watch: watch for changes and re-run tests" deps: - glide install + glide install && go install github.com/onsi/ginkgo/ginkgo test: deps ginkgo -race -randomizeAllSpecs -r -skipPackage vendor -progress . From c83171bffed8a61960707ad2741976faa86f22df Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 14:39:58 -0500 Subject: [PATCH 07/14] ensure go vendoring is enabled before running go install --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 01e9ac8..67665bf 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ help: @echo "- watch: watch for changes and re-run tests" deps: - glide install && go install github.com/onsi/ginkgo/ginkgo + glide install && GO15VENDOREXPERIMENT=1 go install github.com/onsi/ginkgo/ginkgo test: deps ginkgo -race -randomizeAllSpecs -r -skipPackage vendor -progress . From 2bd116556bca2f2b843e8091ec736fb01b263da6 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 14:42:46 -0500 Subject: [PATCH 08/14] Just use go get in the travis config to ensure ginkgo is installed --- .travis.yml | 1 + Makefile | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 2b494e8..a7e8c2b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,6 +25,7 @@ before_script: install: - go get github.com/Masterminds/glide + - go get github.com/onsi/ginkgo/ginkgo script: - make test diff --git a/Makefile b/Makefile index 67665bf..d27ce07 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ help: @echo "- watch: watch for changes and re-run tests" deps: - glide install && GO15VENDOREXPERIMENT=1 go install github.com/onsi/ginkgo/ginkgo + glide install test: deps ginkgo -race -randomizeAllSpecs -r -skipPackage vendor -progress . From 5acff13ac5333ed1ad12e4dddaf664544e139e81 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 14:46:09 -0500 Subject: [PATCH 09/14] Just build ginkgo into vendor/bin and use that --- .travis.yml | 1 - Makefile | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index a7e8c2b..2b494e8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,7 +25,6 @@ before_script: install: - go get github.com/Masterminds/glide - - go get github.com/onsi/ginkgo/ginkgo script: - make test diff --git a/Makefile b/Makefile index d27ce07..85513fe 100644 --- a/Makefile +++ b/Makefile @@ -5,10 +5,11 @@ help: @echo "- watch: watch for changes and re-run tests" deps: - glide install + glide install && mkdir -p vendor/bin && go build -o vendor/bin/ginkgo github.com/onsi/ginkgo/ginkgo + test: deps - ginkgo -race -randomizeAllSpecs -r -skipPackage vendor -progress . + vendor/bin/ginkgo -race -randomizeAllSpecs -r -skipPackage vendor -progress . watch: deps - ginkgo watch -race -randomizeAllSpecs -r -skipPackage vendor -progress -notify . + vendor/bin/ginkgo watch -race -randomizeAllSpecs -r -skipPackage vendor -progress -notify . From 10e4302b6c980eb8f5f6c099ca2174139a2e5107 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 15:08:50 -0500 Subject: [PATCH 10/14] Use relative path to build ginkgo --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 85513fe..e41edcc 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ help: @echo "- watch: watch for changes and re-run tests" deps: - glide install && mkdir -p vendor/bin && go build -o vendor/bin/ginkgo github.com/onsi/ginkgo/ginkgo + glide install && mkdir -p vendor/bin && go build -o vendor/bin/ginkgo ./vendor/github.com/onsi/ginkgo/ginkgo test: deps From db565276fceaa3c88a74b0a344d1b716589cac86 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 16:41:27 -0500 Subject: [PATCH 11/14] Provide a way of making external request builders, separate generationg of requests from sending of requests --- goes.go | 248 +++++++++++++++++++-------------------------------- goes_test.go | 19 ++-- request.go | 106 ++++++++++++++++++++++ structs.go | 38 +------- 4 files changed, 207 insertions(+), 204 deletions(-) create mode 100644 request.go diff --git a/goes.go b/goes.go index 61af34f..330b805 100644 --- a/goes.go +++ b/goes.go @@ -46,90 +46,83 @@ func (c *Client) WithHTTPClient(cl *http.Client) *Client { // CreateIndex creates a new index represented by a name and a mapping func (c *Client) CreateIndex(name string, mapping interface{}) (*Response, error) { r := Request{ - Conn: c, Query: mapping, IndexList: []string{name}, Method: "PUT", } - return r.Run() + return c.Do(&r) } // DeleteIndex deletes an index represented by a name func (c *Client) DeleteIndex(name string) (*Response, error) { r := Request{ - Conn: c, IndexList: []string{name}, Method: "DELETE", } - return r.Run() + return c.Do(&r) } // RefreshIndex refreshes an index represented by a name func (c *Client) RefreshIndex(name string) (*Response, error) { r := Request{ - Conn: c, IndexList: []string{name}, Method: "POST", API: "_refresh", } - return r.Run() + return c.Do(&r) } // UpdateIndexSettings updates settings for existing index represented by a name and a settings // as described here: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html func (c *Client) UpdateIndexSettings(name string, settings interface{}) (*Response, error) { r := Request{ - Conn: c, Query: settings, IndexList: []string{name}, Method: "PUT", API: "_settings", } - return r.Run() + return c.Do(&r) } // Optimize an index represented by a name, extra args are also allowed please check: // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize func (c *Client) Optimize(indexList []string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, IndexList: indexList, ExtraArgs: extraArgs, Method: "POST", API: "_optimize", } - return r.Run() + return c.Do(&r) } // Stats fetches statistics (_stats) for the current elasticsearch server func (c *Client) Stats(indexList []string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, IndexList: indexList, ExtraArgs: extraArgs, Method: "GET", API: "_stats", } - return r.Run() + return c.Do(&r) } // IndexStatus fetches the status (_status) for the indices defined in // indexList. Use _all in indexList to get stats for all indices func (c *Client) IndexStatus(indexList []string) (*Response, error) { r := Request{ - Conn: c, IndexList: indexList, Method: "GET", API: "_status", } - return r.Run() + return c.Do(&r) } // BulkSend bulk adds multiple documents in bulk mode @@ -203,19 +196,33 @@ func (c *Client) BulkSend(documents []Document) (*Response, error) { bulkData[len(bulkData)-1] = []byte(nil) r := Request{ - Conn: c, Method: "POST", API: "_bulk", BulkData: bytes.Join(bulkData, []byte("\n")), } - return r.Run() + resp, err := c.Do(&r) + if err != nil { + return resp, err + } + + if resp.Errors { + for _, item := range resp.Items { + for _, i := range item { + if i.Error != "" { + return resp, &SearchError{i.Error, i.Status} + } + } + } + return resp, &SearchError{Msg: "Unknown error while bulk indexing"} + } + + return resp, err } // Search executes a search query against an index func (c *Client) Search(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, Query: query, IndexList: indexList, TypeList: typeList, @@ -224,13 +231,12 @@ func (c *Client) Search(query interface{}, indexList []string, typeList []string ExtraArgs: extraArgs, } - return r.Run() + return c.Do(&r) } // Count executes a count query against an index, use the Count field in the response for the result func (c *Client) Count(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, Query: query, IndexList: indexList, TypeList: typeList, @@ -239,7 +245,7 @@ func (c *Client) Count(query interface{}, indexList []string, typeList []string, ExtraArgs: extraArgs, } - return r.Run() + return c.Do(&r) } //Query runs a query against an index using the provided http method. @@ -247,7 +253,6 @@ func (c *Client) Count(query interface{}, indexList []string, typeList []string, //for the HTTP method. func (c *Client) Query(query interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, Query: query, IndexList: indexList, TypeList: typeList, @@ -256,7 +261,7 @@ func (c *Client) Query(query interface{}, indexList []string, typeList []string, ExtraArgs: extraArgs, } - return r.Run() + return c.Do(&r) } // Scan starts scroll over an index @@ -267,7 +272,6 @@ func (c *Client) Scan(query interface{}, indexList []string, typeList []string, v.Add("size", strconv.Itoa(size)) r := Request{ - Conn: c, Query: query, IndexList: indexList, TypeList: typeList, @@ -276,7 +280,7 @@ func (c *Client) Scan(query interface{}, indexList []string, typeList []string, ExtraArgs: v, } - return r.Run() + return c.Do(&r) } // Scroll fetches data by scroll id @@ -285,27 +289,25 @@ func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) { v.Add("scroll", timeout) r := Request{ - Conn: c, Method: "POST", API: "_search/scroll", ExtraArgs: v, Body: []byte(scrollID), } - return r.Run() + return c.Do(&r) } // Get a typed document by its id func (c *Client) Get(index string, documentType string, id string, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, IndexList: []string{index}, Method: "GET", API: documentType + "/" + id, ExtraArgs: extraArgs, } - return r.Run() + return c.Do(&r) } // Index indexes a Document @@ -313,7 +315,6 @@ func (c *Client) Get(index string, documentType string, id string, extraArgs url // URL arguments, for example, to control routing, ttl, version, op_type, etc. func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, Query: d.Fields, IndexList: []string{d.Index.(string)}, TypeList: []string{d.Type}, @@ -326,7 +327,7 @@ func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { r.ID = d.ID.(string) } - return r.Run() + return c.Do(&r) } // Delete deletes a Document d @@ -334,7 +335,6 @@ func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { // URL arguments, for example, to control routing. func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, IndexList: []string{d.Index.(string)}, TypeList: []string{d.Type}, ExtraArgs: extraArgs, @@ -342,117 +342,7 @@ func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) { ID: d.ID.(string), } - return r.Run() -} - -// Run executes an elasticsearch Request. It converts data to Json, sends the -// request and returns the Response obtained -func (req *Request) Run() (*Response, error) { - body, statusCode, err := req.run() - esResp := &Response{Status: statusCode} - - if err != nil { - return esResp, err - } - - if req.Method != "HEAD" { - err = json.Unmarshal(body, &esResp) - if err != nil { - return esResp, err - } - err = json.Unmarshal(body, &esResp.Raw) - if err != nil { - return esResp, err - } - } - - if req.API == "_bulk" && esResp.Errors { - for _, item := range esResp.Items { - for _, i := range item { - if i.Error != "" { - return esResp, &SearchError{i.Error, i.Status} - } - } - } - return esResp, &SearchError{Msg: "Unknown error while bulk indexing"} - } - - if esResp.Error != "" { - return esResp, &SearchError{esResp.Error, esResp.Status} - } - - return esResp, nil -} - -func (req *Request) run() ([]byte, uint64, error) { - postData := []byte{} - - // XXX : refactor this - if len(req.Body) > 0 { - postData = req.Body - } else if req.API == "_bulk" { - postData = req.BulkData - } else { - b, err := json.Marshal(req.Query) - if err != nil { - return nil, 0, err - } - postData = b - } - - reader := bytes.NewReader(postData) - - newReq, err := http.NewRequest(req.Method, req.URL(), reader) - if err != nil { - return nil, 0, err - } - - if req.Method == "POST" || req.Method == "PUT" { - newReq.Header.Set("Content-Type", "application/json") - } - - resp, err := req.Conn.Client.Do(newReq) - if err != nil { - return nil, 0, err - } - - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, uint64(resp.StatusCode), err - } - - if resp.StatusCode > 201 && resp.StatusCode < 400 { - return nil, uint64(resp.StatusCode), errors.New(string(body)) - } - - return body, uint64(resp.StatusCode), nil -} - -// URL builds a Request for a URL -func (req *Request) URL() string { - path := "/" + strings.Join(req.IndexList, ",") - - if len(req.TypeList) > 0 { - path += "/" + strings.Join(req.TypeList, ",") - } - - // XXX : for indexing documents using the normal (non bulk) API - if len(req.ID) > 0 { - path += "/" + req.ID - } - - path += "/" + req.API - - u := url.URL{ - Scheme: "http", - Host: fmt.Sprintf("%s:%s", req.Conn.Host, req.Conn.Port), - Path: path, - RawQuery: req.ExtraArgs.Encode(), - } - - return u.String() + return c.Do(&r) } // Buckets returns list of buckets in aggregation @@ -489,39 +379,36 @@ func (b Bucket) Aggregation(name string) Aggregation { func (c *Client) PutMapping(typeName string, mapping interface{}, indexes []string) (*Response, error) { r := Request{ - Conn: c, Query: mapping, IndexList: indexes, Method: "PUT", API: "_mappings/" + typeName, } - return r.Run() + return c.Do(&r) } // GetMapping returns the mappings for the specified types func (c *Client) GetMapping(types []string, indexes []string) (*Response, error) { r := Request{ - Conn: c, IndexList: indexes, Method: "GET", API: "_mapping/" + strings.Join(types, ","), } - return r.Run() + return c.Do(&r) } // IndicesExist checks whether index (or indices) exist on the server func (c *Client) IndicesExist(indexes []string) (bool, error) { r := Request{ - Conn: c, IndexList: indexes, Method: "HEAD", } - resp, err := r.Run() + resp, err := c.Do(&r) return resp.Status == 200, err } @@ -529,7 +416,6 @@ func (c *Client) IndicesExist(indexes []string) (bool, error) { // Update updates the specified document using the _update endpoint func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) { r := Request{ - Conn: c, Query: query, IndexList: []string{d.Index.(string)}, TypeList: []string{d.Type}, @@ -542,20 +428,19 @@ func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*R r.ID = d.ID.(string) } - return r.Run() + return c.Do(&r) } // DeleteMapping deletes a mapping along with all data in the type func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, error) { r := Request{ - Conn: c, IndexList: indexes, Method: "DELETE", API: "_mappings/" + typeName, } - return r.Run() + return c.Do(&r) } func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Response, error) { @@ -573,13 +458,12 @@ func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Re } r := Request{ - Conn: c, Query: command, Method: "POST", API: "_aliases", } - return r.Run() + return c.Do(&r) } // AddAlias creates an alias to one or more indexes @@ -596,12 +480,64 @@ func (c *Client) RemoveAlias(alias string, indexes []string) (*Response, error) func (c *Client) AliasExists(alias string) (bool, error) { r := Request{ - Conn: c, Method: "HEAD", API: "_alias/" + alias, } - resp, err := r.Run() + resp, err := c.Do(&r) return resp.Status == 200, err } + +// Do runs the request returned by the requestor and returns the parsed response +func (c *Client) Do(r Requester) (*Response, error) { + req, err := r.Request() + if err != nil { + return &Response{}, err + } + req.URL.Scheme = "http" + req.URL.Host = fmt.Sprintf("%s:%s", c.Host, c.Port) + + body, statusCode, err := c.doRequest(req) + esResp := &Response{Status: statusCode} + + if err != nil { + return esResp, err + } + + if req.Method != "HEAD" { + err = json.Unmarshal(body, &esResp) + if err != nil { + return esResp, err + } + err = json.Unmarshal(body, &esResp.Raw) + if err != nil { + return esResp, err + } + } + + if esResp.Error != "" { + return esResp, &SearchError{esResp.Error, esResp.Status} + } + + return esResp, nil +} + +func (c *Client) doRequest(req *http.Request) ([]byte, uint64, error) { + resp, err := c.Client.Do(req) + if err != nil { + return nil, 0, err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, uint64(resp.StatusCode), err + } + + if resp.StatusCode > 201 && resp.StatusCode < 400 { + return nil, uint64(resp.StatusCode), errors.New(string(body)) + } + + return body, uint64(resp.StatusCode), nil +} diff --git a/goes_test.go b/goes_test.go index ddd7731..cf34966 100644 --- a/goes_test.go +++ b/goes_test.go @@ -60,10 +60,9 @@ func (s *GoesTestSuite) TestWithHTTPClient(c *C) { } func (s *GoesTestSuite) TestUrl(c *C) { - conn := NewClient(ESHost, ESPort) + //conn := NewClient(ESHost, ESPort) r := Request{ - Conn: conn, Query: "q", IndexList: []string{"i"}, TypeList: []string{}, @@ -71,21 +70,21 @@ func (s *GoesTestSuite) TestUrl(c *C) { API: "_search", } - c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/i/_search") + c.Assert(r.URL().String(), Equals, "/i/_search") r.IndexList = []string{"a", "b"} - c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/_search") + c.Assert(r.URL().String(), Equals, "/a,b/_search") r.TypeList = []string{"c", "d"} - c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/_search") + c.Assert(r.URL().String(), Equals, "/a,b/c,d/_search") r.ExtraArgs = make(url.Values, 1) r.ExtraArgs.Set("version", "1") - c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/_search?version=1") + c.Assert(r.URL().String(), Equals, "/a,b/c,d/_search?version=1") r.ID = "1234" r.API = "" - c.Assert(r.URL(), Equals, "http://"+ESHost+":"+ESPort+"/a,b/c,d/1234/?version=1") + c.Assert(r.URL().String(), Equals, "/a,b/c,d/1234/?version=1") } func (s *GoesTestSuite) TestEsDown(c *C) { @@ -94,13 +93,12 @@ func (s *GoesTestSuite) TestEsDown(c *C) { var query = map[string]interface{}{"query": "foo"} r := Request{ - Conn: conn, Query: query, IndexList: []string{"i"}, Method: "GET", API: "_search", } - _, err := r.Run() + _, err := conn.Do(&r) c.Assert(err, ErrorMatches, ".* no such host") } @@ -111,13 +109,12 @@ func (s *GoesTestSuite) TestRunMissingIndex(c *C) { var query = map[string]interface{}{"query": "foo"} r := Request{ - Conn: conn, Query: query, IndexList: []string{"i"}, Method: "GET", API: "_search", } - _, err := r.Run() + _, err := conn.Do(&r) c.Assert(err.Error(), Equals, "[404] IndexMissingException[[i] missing]") } diff --git a/request.go b/request.go new file mode 100644 index 0000000..abe05a2 --- /dev/null +++ b/request.go @@ -0,0 +1,106 @@ +package goes + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "net/url" + "strings" +) + +// Requester implements Request which builds an HTTP request for Elasticsearch +type Requester interface { + // Request should set the URL and Body (if needed). The host of the URL will be overwritten by the client. + Request() (*http.Request, error) +} + +// Request holds a single request to elasticsearch +type Request struct { + // A search query + Query interface{} + + // Which index to search into + IndexList []string + + // Which type to search into + TypeList []string + + // HTTP Method to user (GET, POST ...) + Method string + + // Which api keyword (_search, _bulk, etc) to use + API string + + // Bulk data + BulkData []byte + + // Request body + Body []byte + + // A list of extra URL arguments + ExtraArgs url.Values + + // Used for the id field when indexing a document + ID string +} + +// URL builds a URL for a Request +func (req *Request) URL() *url.URL { + path := "/" + strings.Join(req.IndexList, ",") + + if len(req.TypeList) > 0 { + path += "/" + strings.Join(req.TypeList, ",") + } + + // XXX : for indexing documents using the normal (non bulk) API + if len(req.ID) > 0 { + path += "/" + req.ID + } + + path += "/" + req.API + + u := url.URL{ + //Scheme: "http", + //Host: fmt.Sprintf("%s:%s", req.Conn.Host, req.Conn.Port), + Path: path, + RawQuery: req.ExtraArgs.Encode(), + } + + return &u +} + +// Request generates an http.Request based on the contents of the Request struct +func (req *Request) Request() (*http.Request, error) { + postData := []byte{} + + // XXX : refactor this + if len(req.Body) > 0 { + postData = req.Body + } else if req.API == "_bulk" { + postData = req.BulkData + } else { + b, err := json.Marshal(req.Query) + if err != nil { + return nil, err + } + postData = b + } + + reader := ioutil.NopCloser(bytes.NewReader(postData)) + + newReq, err := http.NewRequest(req.Method, "", nil) + if err != nil { + return nil, err + } + newReq.URL = req.URL() + newReq.Body = reader + newReq.ContentLength = int64(len(postData)) + + if req.Method == "POST" || req.Method == "PUT" { + newReq.Header.Set("Content-Type", "application/json") + } + return newReq, nil +} + +var _ Requester = (*Request)(nil) diff --git a/structs.go b/structs.go index 6aca112..dbfd686 100644 --- a/structs.go +++ b/structs.go @@ -4,10 +4,7 @@ package goes -import ( - "net/http" - "net/url" -) +import "net/http" // Client represents a connection to elasticsearch type Client struct { @@ -22,39 +19,6 @@ type Client struct { Client *http.Client } -// Request holds a single request to elasticsearch -type Request struct { - // Which connection will be used - Conn *Client - - // A search query - Query interface{} - - // Which index to search into - IndexList []string - - // Which type to search into - TypeList []string - - // HTTP Method to user (GET, POST ...) - Method string - - // Which api keyword (_search, _bulk, etc) to use - API string - - // Bulk data - BulkData []byte - - // Request body - Body []byte - - // A list of extra URL arguments - ExtraArgs url.Values - - // Used for the id field when indexing a document - ID string -} - // Response holds an elasticsearch response type Response struct { Acknowledged bool From e89f41828e5ce6c98d0ecb3107127f19157a7d4d Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Tue, 27 Sep 2016 17:48:14 -0500 Subject: [PATCH 12/14] Remove commented out code --- goes_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/goes_test.go b/goes_test.go index cf34966..1abe586 100644 --- a/goes_test.go +++ b/goes_test.go @@ -60,8 +60,6 @@ func (s *GoesTestSuite) TestWithHTTPClient(c *C) { } func (s *GoesTestSuite) TestUrl(c *C) { - //conn := NewClient(ESHost, ESPort) - r := Request{ Query: "q", IndexList: []string{"i"}, From 2715203d9639915d040622efb5169fad1b9edf19 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 13 Oct 2016 02:54:52 -0500 Subject: [PATCH 13/14] Leave off extra leading slash in bulk requests --- request.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/request.go b/request.go index abe05a2..bf8093d 100644 --- a/request.go +++ b/request.go @@ -47,7 +47,10 @@ type Request struct { // URL builds a URL for a Request func (req *Request) URL() *url.URL { - path := "/" + strings.Join(req.IndexList, ",") + var path string + if len(req.IndexList) > 0 { + path = "/" + strings.Join(req.IndexList, ",") + } if len(req.TypeList) > 0 { path += "/" + strings.Join(req.TypeList, ",") From 5d13647d3ceafcecade08a1a3ea30ca634cd5495 Mon Sep 17 00:00:00 2001 From: Paul Bonser Date: Thu, 13 Oct 2016 19:37:40 -0500 Subject: [PATCH 14/14] Handle non-string errors for ES 2+ support --- goes.go | 7 +++++++ structs.go | 6 +++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/goes.go b/goes.go index 330b805..94b3fe4 100644 --- a/goes.go +++ b/goes.go @@ -516,6 +516,13 @@ func (c *Client) Do(r Requester) (*Response, error) { } } + if len(esResp.RawError) > 0 && esResp.RawError[0] == '"' { + json.Unmarshal(esResp.RawError, &esResp.Error) + } else { + esResp.Error = string(esResp.RawError) + } + esResp.RawError = nil + if esResp.Error != "" { return esResp, &SearchError{esResp.Error, esResp.Status} } diff --git a/structs.go b/structs.go index dbfd686..ca08edd 100644 --- a/structs.go +++ b/structs.go @@ -4,7 +4,10 @@ package goes -import "net/http" +import ( + "encoding/json" + "net/http" +) // Client represents a connection to elasticsearch type Client struct { @@ -23,6 +26,7 @@ type Client struct { type Response struct { Acknowledged bool Error string + RawError json.RawMessage `json:"error"` Errors bool Status uint64 Took uint64