|
|
@@ -46,90 +46,83 @@ func (c *Client) WithHTTPClient(cl *http.Client) *Client { |
|
|
|
// CreateIndex creates a new index represented by a name and a mapping |
|
|
|
func (c *Client) CreateIndex(name string, mapping interface{}) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Query: mapping, |
|
|
|
IndexList: []string{name}, |
|
|
|
Method: "PUT", |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// DeleteIndex deletes an index represented by a name |
|
|
|
func (c *Client) DeleteIndex(name string) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
IndexList: []string{name}, |
|
|
|
Method: "DELETE", |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// RefreshIndex refreshes an index represented by a name |
|
|
|
func (c *Client) RefreshIndex(name string) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
IndexList: []string{name}, |
|
|
|
Method: "POST", |
|
|
|
API: "_refresh", |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// UpdateIndexSettings updates settings for existing index represented by a name and a settings |
|
|
|
// as described here: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html |
|
|
|
func (c *Client) UpdateIndexSettings(name string, settings interface{}) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Query: settings, |
|
|
|
IndexList: []string{name}, |
|
|
|
Method: "PUT", |
|
|
|
API: "_settings", |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// Optimize an index represented by a name, extra args are also allowed please check: |
|
|
|
// http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize |
|
|
|
func (c *Client) Optimize(indexList []string, extraArgs url.Values) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
IndexList: indexList, |
|
|
|
ExtraArgs: extraArgs, |
|
|
|
Method: "POST", |
|
|
|
API: "_optimize", |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// Stats fetches statistics (_stats) for the current elasticsearch server |
|
|
|
func (c *Client) Stats(indexList []string, extraArgs url.Values) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
IndexList: indexList, |
|
|
|
ExtraArgs: extraArgs, |
|
|
|
Method: "GET", |
|
|
|
API: "_stats", |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// IndexStatus fetches the status (_status) for the indices defined in |
|
|
|
// indexList. Use _all in indexList to get stats for all indices |
|
|
|
func (c *Client) IndexStatus(indexList []string) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
IndexList: indexList, |
|
|
|
Method: "GET", |
|
|
|
API: "_status", |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// BulkSend bulk adds multiple documents in bulk mode |
|
|
@@ -203,19 +196,33 @@ func (c *Client) BulkSend(documents []Document) (*Response, error) { |
|
|
|
bulkData[len(bulkData)-1] = []byte(nil) |
|
|
|
|
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Method: "POST", |
|
|
|
API: "_bulk", |
|
|
|
BulkData: bytes.Join(bulkData, []byte("\n")), |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
resp, err := c.Do(&r) |
|
|
|
if err != nil { |
|
|
|
return resp, err |
|
|
|
} |
|
|
|
|
|
|
|
if resp.Errors { |
|
|
|
for _, item := range resp.Items { |
|
|
|
for _, i := range item { |
|
|
|
if i.Error != "" { |
|
|
|
return resp, &SearchError{i.Error, i.Status} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return resp, &SearchError{Msg: "Unknown error while bulk indexing"} |
|
|
|
} |
|
|
|
|
|
|
|
return resp, err |
|
|
|
} |
|
|
|
|
|
|
|
// Search executes a search query against an index |
|
|
|
func (c *Client) Search(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Query: query, |
|
|
|
IndexList: indexList, |
|
|
|
TypeList: typeList, |
|
|
@@ -224,13 +231,12 @@ func (c *Client) Search(query interface{}, indexList []string, typeList []string |
|
|
|
ExtraArgs: extraArgs, |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// Count executes a count query against an index, use the Count field in the response for the result |
|
|
|
func (c *Client) Count(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Query: query, |
|
|
|
IndexList: indexList, |
|
|
|
TypeList: typeList, |
|
|
@@ -239,7 +245,7 @@ func (c *Client) Count(query interface{}, indexList []string, typeList []string, |
|
|
|
ExtraArgs: extraArgs, |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
//Query runs a query against an index using the provided http method. |
|
|
@@ -247,7 +253,6 @@ func (c *Client) Count(query interface{}, indexList []string, typeList []string, |
|
|
|
//for the HTTP method. |
|
|
|
func (c *Client) Query(query interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Query: query, |
|
|
|
IndexList: indexList, |
|
|
|
TypeList: typeList, |
|
|
@@ -256,7 +261,7 @@ func (c *Client) Query(query interface{}, indexList []string, typeList []string, |
|
|
|
ExtraArgs: extraArgs, |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// Scan starts scroll over an index |
|
|
@@ -267,7 +272,6 @@ func (c *Client) Scan(query interface{}, indexList []string, typeList []string, |
|
|
|
v.Add("size", strconv.Itoa(size)) |
|
|
|
|
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Query: query, |
|
|
|
IndexList: indexList, |
|
|
|
TypeList: typeList, |
|
|
@@ -276,7 +280,7 @@ func (c *Client) Scan(query interface{}, indexList []string, typeList []string, |
|
|
|
ExtraArgs: v, |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// Scroll fetches data by scroll id |
|
|
@@ -285,27 +289,25 @@ func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) { |
|
|
|
v.Add("scroll", timeout) |
|
|
|
|
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Method: "POST", |
|
|
|
API: "_search/scroll", |
|
|
|
ExtraArgs: v, |
|
|
|
Body: []byte(scrollID), |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// Get a typed document by its id |
|
|
|
func (c *Client) Get(index string, documentType string, id string, extraArgs url.Values) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
IndexList: []string{index}, |
|
|
|
Method: "GET", |
|
|
|
API: documentType + "/" + id, |
|
|
|
ExtraArgs: extraArgs, |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// Index indexes a Document |
|
|
@@ -313,7 +315,6 @@ func (c *Client) Get(index string, documentType string, id string, extraArgs url |
|
|
|
// URL arguments, for example, to control routing, ttl, version, op_type, etc. |
|
|
|
func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Query: d.Fields, |
|
|
|
IndexList: []string{d.Index.(string)}, |
|
|
|
TypeList: []string{d.Type}, |
|
|
@@ -326,7 +327,7 @@ func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { |
|
|
|
r.ID = d.ID.(string) |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// Delete deletes a Document d |
|
|
@@ -334,7 +335,6 @@ func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) { |
|
|
|
// URL arguments, for example, to control routing. |
|
|
|
func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
IndexList: []string{d.Index.(string)}, |
|
|
|
TypeList: []string{d.Type}, |
|
|
|
ExtraArgs: extraArgs, |
|
|
@@ -342,117 +342,7 @@ func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) { |
|
|
|
ID: d.ID.(string), |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
} |
|
|
|
|
|
|
|
// Run executes an elasticsearch Request. It converts data to Json, sends the |
|
|
|
// request and returns the Response obtained |
|
|
|
func (req *Request) Run() (*Response, error) { |
|
|
|
body, statusCode, err := req.run() |
|
|
|
esResp := &Response{Status: statusCode} |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return esResp, err |
|
|
|
} |
|
|
|
|
|
|
|
if req.Method != "HEAD" { |
|
|
|
err = json.Unmarshal(body, &esResp) |
|
|
|
if err != nil { |
|
|
|
return esResp, err |
|
|
|
} |
|
|
|
err = json.Unmarshal(body, &esResp.Raw) |
|
|
|
if err != nil { |
|
|
|
return esResp, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if req.API == "_bulk" && esResp.Errors { |
|
|
|
for _, item := range esResp.Items { |
|
|
|
for _, i := range item { |
|
|
|
if i.Error != "" { |
|
|
|
return esResp, &SearchError{i.Error, i.Status} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return esResp, &SearchError{Msg: "Unknown error while bulk indexing"} |
|
|
|
} |
|
|
|
|
|
|
|
if esResp.Error != "" { |
|
|
|
return esResp, &SearchError{esResp.Error, esResp.Status} |
|
|
|
} |
|
|
|
|
|
|
|
return esResp, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (req *Request) run() ([]byte, uint64, error) { |
|
|
|
postData := []byte{} |
|
|
|
|
|
|
|
// XXX : refactor this |
|
|
|
if len(req.Body) > 0 { |
|
|
|
postData = req.Body |
|
|
|
} else if req.API == "_bulk" { |
|
|
|
postData = req.BulkData |
|
|
|
} else { |
|
|
|
b, err := json.Marshal(req.Query) |
|
|
|
if err != nil { |
|
|
|
return nil, 0, err |
|
|
|
} |
|
|
|
postData = b |
|
|
|
} |
|
|
|
|
|
|
|
reader := bytes.NewReader(postData) |
|
|
|
|
|
|
|
newReq, err := http.NewRequest(req.Method, req.URL(), reader) |
|
|
|
if err != nil { |
|
|
|
return nil, 0, err |
|
|
|
} |
|
|
|
|
|
|
|
if req.Method == "POST" || req.Method == "PUT" { |
|
|
|
newReq.Header.Set("Content-Type", "application/json") |
|
|
|
} |
|
|
|
|
|
|
|
resp, err := req.Conn.Client.Do(newReq) |
|
|
|
if err != nil { |
|
|
|
return nil, 0, err |
|
|
|
} |
|
|
|
|
|
|
|
defer resp.Body.Close() |
|
|
|
|
|
|
|
body, err := ioutil.ReadAll(resp.Body) |
|
|
|
if err != nil { |
|
|
|
return nil, uint64(resp.StatusCode), err |
|
|
|
} |
|
|
|
|
|
|
|
if resp.StatusCode > 201 && resp.StatusCode < 400 { |
|
|
|
return nil, uint64(resp.StatusCode), errors.New(string(body)) |
|
|
|
} |
|
|
|
|
|
|
|
return body, uint64(resp.StatusCode), nil |
|
|
|
} |
|
|
|
|
|
|
|
// URL builds a Request for a URL |
|
|
|
func (req *Request) URL() string { |
|
|
|
path := "/" + strings.Join(req.IndexList, ",") |
|
|
|
|
|
|
|
if len(req.TypeList) > 0 { |
|
|
|
path += "/" + strings.Join(req.TypeList, ",") |
|
|
|
} |
|
|
|
|
|
|
|
// XXX : for indexing documents using the normal (non bulk) API |
|
|
|
if len(req.ID) > 0 { |
|
|
|
path += "/" + req.ID |
|
|
|
} |
|
|
|
|
|
|
|
path += "/" + req.API |
|
|
|
|
|
|
|
u := url.URL{ |
|
|
|
Scheme: "http", |
|
|
|
Host: fmt.Sprintf("%s:%s", req.Conn.Host, req.Conn.Port), |
|
|
|
Path: path, |
|
|
|
RawQuery: req.ExtraArgs.Encode(), |
|
|
|
} |
|
|
|
|
|
|
|
return u.String() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// Buckets returns list of buckets in aggregation |
|
|
@@ -489,39 +379,36 @@ func (b Bucket) Aggregation(name string) Aggregation { |
|
|
|
func (c *Client) PutMapping(typeName string, mapping interface{}, indexes []string) (*Response, error) { |
|
|
|
|
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Query: mapping, |
|
|
|
IndexList: indexes, |
|
|
|
Method: "PUT", |
|
|
|
API: "_mappings/" + typeName, |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// GetMapping returns the mappings for the specified types |
|
|
|
func (c *Client) GetMapping(types []string, indexes []string) (*Response, error) { |
|
|
|
|
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
IndexList: indexes, |
|
|
|
Method: "GET", |
|
|
|
API: "_mapping/" + strings.Join(types, ","), |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// IndicesExist checks whether index (or indices) exist on the server |
|
|
|
func (c *Client) IndicesExist(indexes []string) (bool, error) { |
|
|
|
|
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
IndexList: indexes, |
|
|
|
Method: "HEAD", |
|
|
|
} |
|
|
|
|
|
|
|
resp, err := r.Run() |
|
|
|
resp, err := c.Do(&r) |
|
|
|
|
|
|
|
return resp.Status == 200, err |
|
|
|
} |
|
|
@@ -529,7 +416,6 @@ func (c *Client) IndicesExist(indexes []string) (bool, error) { |
|
|
|
// Update updates the specified document using the _update endpoint |
|
|
|
func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) { |
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Query: query, |
|
|
|
IndexList: []string{d.Index.(string)}, |
|
|
|
TypeList: []string{d.Type}, |
|
|
@@ -542,20 +428,19 @@ func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*R |
|
|
|
r.ID = d.ID.(string) |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// DeleteMapping deletes a mapping along with all data in the type |
|
|
|
func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, error) { |
|
|
|
|
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
IndexList: indexes, |
|
|
|
Method: "DELETE", |
|
|
|
API: "_mappings/" + typeName, |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Response, error) { |
|
|
@@ -573,13 +458,12 @@ func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Re |
|
|
|
} |
|
|
|
|
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Query: command, |
|
|
|
Method: "POST", |
|
|
|
API: "_aliases", |
|
|
|
} |
|
|
|
|
|
|
|
return r.Run() |
|
|
|
return c.Do(&r) |
|
|
|
} |
|
|
|
|
|
|
|
// AddAlias creates an alias to one or more indexes |
|
|
@@ -596,12 +480,64 @@ func (c *Client) RemoveAlias(alias string, indexes []string) (*Response, error) |
|
|
|
func (c *Client) AliasExists(alias string) (bool, error) { |
|
|
|
|
|
|
|
r := Request{ |
|
|
|
Conn: c, |
|
|
|
Method: "HEAD", |
|
|
|
API: "_alias/" + alias, |
|
|
|
} |
|
|
|
|
|
|
|
resp, err := r.Run() |
|
|
|
resp, err := c.Do(&r) |
|
|
|
|
|
|
|
return resp.Status == 200, err |
|
|
|
} |
|
|
|
|
|
|
|
// Do runs the request returned by the requestor and returns the parsed response |
|
|
|
func (c *Client) Do(r Requester) (*Response, error) { |
|
|
|
req, err := r.Request() |
|
|
|
if err != nil { |
|
|
|
return &Response{}, err |
|
|
|
} |
|
|
|
req.URL.Scheme = "http" |
|
|
|
req.URL.Host = fmt.Sprintf("%s:%s", c.Host, c.Port) |
|
|
|
|
|
|
|
body, statusCode, err := c.doRequest(req) |
|
|
|
esResp := &Response{Status: statusCode} |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return esResp, err |
|
|
|
} |
|
|
|
|
|
|
|
if req.Method != "HEAD" { |
|
|
|
err = json.Unmarshal(body, &esResp) |
|
|
|
if err != nil { |
|
|
|
return esResp, err |
|
|
|
} |
|
|
|
err = json.Unmarshal(body, &esResp.Raw) |
|
|
|
if err != nil { |
|
|
|
return esResp, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if esResp.Error != "" { |
|
|
|
return esResp, &SearchError{esResp.Error, esResp.Status} |
|
|
|
} |
|
|
|
|
|
|
|
return esResp, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (c *Client) doRequest(req *http.Request) ([]byte, uint64, error) { |
|
|
|
resp, err := c.Client.Do(req) |
|
|
|
if err != nil { |
|
|
|
return nil, 0, err |
|
|
|
} |
|
|
|
defer resp.Body.Close() |
|
|
|
|
|
|
|
body, err := ioutil.ReadAll(resp.Body) |
|
|
|
if err != nil { |
|
|
|
return nil, uint64(resp.StatusCode), err |
|
|
|
} |
|
|
|
|
|
|
|
if resp.StatusCode > 201 && resp.StatusCode < 400 { |
|
|
|
return nil, uint64(resp.StatusCode), errors.New(string(body)) |
|
|
|
} |
|
|
|
|
|
|
|
return body, uint64(resp.StatusCode), nil |
|
|
|
} |