You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

658 lines
16 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "reflect"
  15. "strconv"
  16. "strings"
  17. )
  18. const (
  19. // BulkCommandIndex specifies a bulk doc should be indexed
  20. BulkCommandIndex = "index"
  21. // BulkCommandDelete specifies a bulk doc should be deleted
  22. BulkCommandDelete = "delete"
  23. )
  24. func (err *SearchError) Error() string {
  25. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  26. }
  27. // NewClient initiates a new client for an elasticsearch server
  28. //
  29. // This function is pretty useless for now but might be useful in a near future
  30. // if wee need more features like connection pooling or load balancing.
  31. func NewClient(host string, port string) *Client {
  32. return &Client{host, port, http.DefaultClient, "", "", ""}
  33. }
  34. // WithHTTPClient sets the http.Client to be used with the connection. Returns the original client.
  35. func (c *Client) WithHTTPClient(cl *http.Client) *Client {
  36. c.Client = cl
  37. return c
  38. }
  39. // Version returns the detected version of the connected ES server
  40. func (c *Client) Version() (string, error) {
  41. // Use cached version if it was already fetched
  42. if c.version != "" {
  43. return c.version, nil
  44. }
  45. // Get the version if it was not cached
  46. r := Request{Method: "GET"}
  47. res, err := c.Do(&r)
  48. if err != nil {
  49. return "", err
  50. }
  51. if version, ok := res.Raw["version"].(map[string]interface{}); ok {
  52. if number, ok := version["number"].(string); ok {
  53. c.version = number
  54. return number, nil
  55. }
  56. }
  57. return "", errors.New("No version returned by ElasticSearch Server")
  58. }
  59. // CreateIndex creates a new index represented by a name and a mapping
  60. func (c *Client) CreateIndex(name string, mapping interface{}) (*Response, error) {
  61. r := Request{
  62. Query: mapping,
  63. IndexList: []string{name},
  64. Method: "PUT",
  65. }
  66. return c.Do(&r)
  67. }
  68. // DeleteIndex deletes an index represented by a name
  69. func (c *Client) DeleteIndex(name string) (*Response, error) {
  70. r := Request{
  71. IndexList: []string{name},
  72. Method: "DELETE",
  73. }
  74. return c.Do(&r)
  75. }
  76. // RefreshIndex refreshes an index represented by a name
  77. func (c *Client) RefreshIndex(name string) (*Response, error) {
  78. r := Request{
  79. IndexList: []string{name},
  80. Method: "POST",
  81. API: "_refresh",
  82. }
  83. return c.Do(&r)
  84. }
  85. // UpdateIndexSettings updates settings for existing index represented by a name and a settings
  86. // as described here: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html
  87. func (c *Client) UpdateIndexSettings(name string, settings interface{}) (*Response, error) {
  88. r := Request{
  89. Query: settings,
  90. IndexList: []string{name},
  91. Method: "PUT",
  92. API: "_settings",
  93. }
  94. return c.Do(&r)
  95. }
  96. // Optimize an index represented by a name, extra args are also allowed please check:
  97. // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize
  98. func (c *Client) Optimize(indexList []string, extraArgs url.Values) (*Response, error) {
  99. r := Request{
  100. IndexList: indexList,
  101. ExtraArgs: extraArgs,
  102. Method: "POST",
  103. API: "_optimize",
  104. }
  105. if version, _ := c.Version(); version > "2.1" {
  106. r.API = "_forcemerge"
  107. }
  108. return c.Do(&r)
  109. }
  110. // ForceMerge is the same as Optimize, but matches the naming of the endpoint as of ES 2.1.0
  111. func (c *Client) ForceMerge(indexList []string, extraArgs url.Values) (*Response, error) {
  112. return c.Optimize(indexList, extraArgs)
  113. }
  114. // Stats fetches statistics (_stats) for the current elasticsearch server
  115. func (c *Client) Stats(indexList []string, extraArgs url.Values) (*Response, error) {
  116. r := Request{
  117. IndexList: indexList,
  118. ExtraArgs: extraArgs,
  119. Method: "GET",
  120. API: "_stats",
  121. }
  122. return c.Do(&r)
  123. }
  124. // IndexStatus fetches the status (_status) for the indices defined in
  125. // indexList. Use _all in indexList to get stats for all indices
  126. func (c *Client) IndexStatus(indexList []string) (*Response, error) {
  127. r := Request{
  128. IndexList: indexList,
  129. Method: "GET",
  130. API: "_status",
  131. }
  132. return c.Do(&r)
  133. }
  134. // BulkSend bulk adds multiple documents in bulk mode
  135. func (c *Client) BulkSend(documents []Document) (*Response, error) {
  136. // We do not generate a traditional JSON here (often a one liner)
  137. // Elasticsearch expects one line of JSON per line (EOL = \n)
  138. // plus an extra \n at the very end of the document
  139. //
  140. // More informations about the Bulk JSON format for Elasticsearch:
  141. //
  142. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  143. //
  144. // This is quite annoying for us as we can not use the simple JSON
  145. // Marshaler available in Run().
  146. //
  147. // We have to generate this special JSON by ourselves which leads to
  148. // the code below.
  149. //
  150. // I know it is unreadable I must find an elegant way to fix this.
  151. // len(documents) * 2 : action + optional_sources
  152. // + 1 : room for the trailing \n
  153. bulkData := make([][]byte, 0, len(documents)*2+1)
  154. i := 0
  155. for _, doc := range documents {
  156. action, err := json.Marshal(map[string]interface{}{
  157. doc.BulkCommand: map[string]interface{}{
  158. "_index": doc.Index,
  159. "_type": doc.Type,
  160. "_id": doc.ID,
  161. },
  162. })
  163. if err != nil {
  164. return &Response{}, err
  165. }
  166. bulkData = append(bulkData, action)
  167. i++
  168. if doc.Fields != nil {
  169. if docFields, ok := doc.Fields.(map[string]interface{}); ok {
  170. if len(docFields) == 0 {
  171. continue
  172. }
  173. } else {
  174. typeOfFields := reflect.TypeOf(doc.Fields)
  175. if typeOfFields.Kind() == reflect.Ptr {
  176. typeOfFields = typeOfFields.Elem()
  177. }
  178. if typeOfFields.Kind() != reflect.Struct {
  179. return &Response{}, fmt.Errorf("Document fields not in struct or map[string]interface{} format")
  180. }
  181. if typeOfFields.NumField() == 0 {
  182. continue
  183. }
  184. }
  185. sources, err := json.Marshal(doc.Fields)
  186. if err != nil {
  187. return &Response{}, err
  188. }
  189. bulkData = append(bulkData, sources)
  190. i++
  191. }
  192. }
  193. // forces an extra trailing \n absolutely necessary for elasticsearch
  194. bulkData = append(bulkData, []byte(nil))
  195. r := Request{
  196. Method: "POST",
  197. API: "_bulk",
  198. BulkData: bytes.Join(bulkData, []byte("\n")),
  199. }
  200. resp, err := c.Do(&r)
  201. if err != nil {
  202. return resp, err
  203. }
  204. if resp.Errors {
  205. for _, item := range resp.Items {
  206. for _, i := range item {
  207. if i.Error != "" {
  208. return resp, &SearchError{i.Error, i.Status}
  209. }
  210. }
  211. }
  212. return resp, &SearchError{Msg: "Unknown error while bulk indexing"}
  213. }
  214. return resp, err
  215. }
  216. // Search executes a search query against an index
  217. func (c *Client) Search(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  218. r := Request{
  219. Query: query,
  220. IndexList: indexList,
  221. TypeList: typeList,
  222. Method: "POST",
  223. API: "_search",
  224. ExtraArgs: extraArgs,
  225. }
  226. return c.Do(&r)
  227. }
  228. // Count executes a count query against an index, use the Count field in the response for the result
  229. func (c *Client) Count(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  230. r := Request{
  231. Query: query,
  232. IndexList: indexList,
  233. TypeList: typeList,
  234. Method: "POST",
  235. API: "_count",
  236. ExtraArgs: extraArgs,
  237. }
  238. return c.Do(&r)
  239. }
  240. //Query runs a query against an index using the provided http method.
  241. //This method can be used to execute a delete by query, just pass in "DELETE"
  242. //for the HTTP method.
  243. func (c *Client) Query(query interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (*Response, error) {
  244. r := Request{
  245. Query: query,
  246. IndexList: indexList,
  247. TypeList: typeList,
  248. Method: httpMethod,
  249. API: "_query",
  250. ExtraArgs: extraArgs,
  251. }
  252. return c.Do(&r)
  253. }
  254. // DeleteByQuery deletes documents matching the specified query. It will return an error for ES 2.x,
  255. // because delete by query support was removed in those versions.
  256. func (c *Client) DeleteByQuery(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  257. version, err := c.Version()
  258. if err != nil {
  259. return nil, err
  260. }
  261. if version > "2" && version < "5" {
  262. return nil, errors.New("ElasticSearch 2.x does not support delete by query")
  263. }
  264. r := Request{
  265. Query: query,
  266. IndexList: indexList,
  267. TypeList: typeList,
  268. Method: "DELETE",
  269. API: "_query",
  270. ExtraArgs: extraArgs,
  271. }
  272. if version > "5" {
  273. r.API = "_delete_by_query"
  274. r.Method = "POST"
  275. }
  276. return c.Do(&r)
  277. }
  278. // Scan starts scroll over an index.
  279. // For ES versions < 5.x, it uses search_type=scan; for 5.x it uses sort=_doc. This means that data
  280. // will be returned in the initial response for 5.x versions, but not for older versions. Code
  281. // wishing to be compatible with both should be written to handle either case.
  282. func (c *Client) Scan(query interface{}, indexList []string, typeList []string, timeout string, size int) (*Response, error) {
  283. v := url.Values{}
  284. version, err := c.Version()
  285. if err != nil {
  286. return nil, err
  287. }
  288. if version > "5" {
  289. v.Add("sort", "_doc")
  290. } else {
  291. v.Add("search_type", "scan")
  292. }
  293. v.Add("scroll", timeout)
  294. v.Add("size", strconv.Itoa(size))
  295. r := Request{
  296. Query: query,
  297. IndexList: indexList,
  298. TypeList: typeList,
  299. Method: "POST",
  300. API: "_search",
  301. ExtraArgs: v,
  302. }
  303. return c.Do(&r)
  304. }
  305. // Scroll fetches data by scroll id
  306. func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) {
  307. r := Request{
  308. Method: "POST",
  309. API: "_search/scroll",
  310. }
  311. if version, err := c.Version(); err != nil {
  312. return nil, err
  313. } else if version > "2" {
  314. r.Body, err = json.Marshal(map[string]string{"scroll": timeout, "scroll_id": scrollID})
  315. if err != nil {
  316. return nil, err
  317. }
  318. } else {
  319. v := url.Values{}
  320. v.Add("scroll", timeout)
  321. v.Add("scroll_id", scrollID)
  322. r.ExtraArgs = v
  323. }
  324. return c.Do(&r)
  325. }
  326. // Get a typed document by its id
  327. func (c *Client) Get(index string, documentType string, id string, extraArgs url.Values) (*Response, error) {
  328. r := Request{
  329. IndexList: []string{index},
  330. Method: "GET",
  331. API: documentType + "/" + id,
  332. ExtraArgs: extraArgs,
  333. }
  334. return c.Do(&r)
  335. }
  336. // Index indexes a Document
  337. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  338. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  339. func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) {
  340. r := Request{
  341. Query: d.Fields,
  342. IndexList: []string{d.Index.(string)},
  343. TypeList: []string{d.Type},
  344. ExtraArgs: extraArgs,
  345. Method: "POST",
  346. }
  347. if d.ID != nil {
  348. r.Method = "PUT"
  349. r.ID = d.ID.(string)
  350. }
  351. return c.Do(&r)
  352. }
  353. // Delete deletes a Document d
  354. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  355. // URL arguments, for example, to control routing.
  356. func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) {
  357. r := Request{
  358. IndexList: []string{d.Index.(string)},
  359. TypeList: []string{d.Type},
  360. ExtraArgs: extraArgs,
  361. Method: "DELETE",
  362. ID: d.ID.(string),
  363. }
  364. return c.Do(&r)
  365. }
  366. // Buckets returns list of buckets in aggregation
  367. func (a Aggregation) Buckets() []Bucket {
  368. result := []Bucket{}
  369. if buckets, ok := a["buckets"]; ok {
  370. for _, bucket := range buckets.([]interface{}) {
  371. result = append(result, bucket.(map[string]interface{}))
  372. }
  373. }
  374. return result
  375. }
  376. // Key returns key for aggregation bucket
  377. func (b Bucket) Key() interface{} {
  378. return b["key"]
  379. }
  380. // DocCount returns count of documents in this bucket
  381. func (b Bucket) DocCount() uint64 {
  382. return uint64(b["doc_count"].(float64))
  383. }
  384. // Aggregation returns aggregation by name from bucket
  385. func (b Bucket) Aggregation(name string) Aggregation {
  386. if agg, ok := b[name]; ok {
  387. return agg.(map[string]interface{})
  388. }
  389. return Aggregation{}
  390. }
  391. // PutMapping registers a specific mapping for one or more types in one or more indexes
  392. func (c *Client) PutMapping(typeName string, mapping interface{}, indexes []string) (*Response, error) {
  393. r := Request{
  394. Query: mapping,
  395. IndexList: indexes,
  396. Method: "PUT",
  397. API: "_mappings/" + typeName,
  398. }
  399. return c.Do(&r)
  400. }
  401. // GetMapping returns the mappings for the specified types
  402. func (c *Client) GetMapping(types []string, indexes []string) (*Response, error) {
  403. r := Request{
  404. IndexList: indexes,
  405. Method: "GET",
  406. API: "_mapping/" + strings.Join(types, ","),
  407. }
  408. return c.Do(&r)
  409. }
  410. // IndicesExist checks whether index (or indices) exist on the server
  411. func (c *Client) IndicesExist(indexes []string) (bool, error) {
  412. r := Request{
  413. IndexList: indexes,
  414. Method: "HEAD",
  415. }
  416. resp, err := c.Do(&r)
  417. return resp.Status == 200, err
  418. }
  419. // Update updates the specified document using the _update endpoint
  420. func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) {
  421. r := Request{
  422. Query: query,
  423. IndexList: []string{d.Index.(string)},
  424. TypeList: []string{d.Type},
  425. ExtraArgs: extraArgs,
  426. Method: "POST",
  427. API: "_update",
  428. }
  429. if d.ID != nil {
  430. r.ID = d.ID.(string)
  431. }
  432. return c.Do(&r)
  433. }
  434. // DeleteMapping deletes a mapping along with all data in the type
  435. func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, error) {
  436. if version, err := c.Version(); err != nil {
  437. return nil, err
  438. } else if version > "2" {
  439. return nil, errors.New("Deletion of mappings is not supported in ES 2.x and above.")
  440. }
  441. r := Request{
  442. IndexList: indexes,
  443. Method: "DELETE",
  444. API: "_mappings/" + typeName,
  445. }
  446. return c.Do(&r)
  447. }
  448. func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Response, error) {
  449. command := map[string]interface{}{
  450. "actions": make([]map[string]interface{}, 0, 1),
  451. }
  452. for _, index := range indexes {
  453. command["actions"] = append(command["actions"].([]map[string]interface{}), map[string]interface{}{
  454. action: map[string]interface{}{
  455. "index": index,
  456. "alias": alias,
  457. },
  458. })
  459. }
  460. r := Request{
  461. Query: command,
  462. Method: "POST",
  463. API: "_aliases",
  464. }
  465. return c.Do(&r)
  466. }
  467. // AddAlias creates an alias to one or more indexes
  468. func (c *Client) AddAlias(alias string, indexes []string) (*Response, error) {
  469. return c.modifyAlias("add", alias, indexes)
  470. }
  471. // RemoveAlias removes an alias to one or more indexes
  472. func (c *Client) RemoveAlias(alias string, indexes []string) (*Response, error) {
  473. return c.modifyAlias("remove", alias, indexes)
  474. }
  475. // AliasExists checks whether alias is defined on the server
  476. func (c *Client) AliasExists(alias string) (bool, error) {
  477. r := Request{
  478. Method: "HEAD",
  479. API: "_alias/" + alias,
  480. }
  481. resp, err := c.Do(&r)
  482. return resp.Status == 200, err
  483. }
  484. func (c *Client) replaceHost(req *http.Request) {
  485. req.URL.Scheme = "http"
  486. req.URL.Host = fmt.Sprintf("%s:%s", c.Host, c.Port)
  487. }
  488. // DoRaw Does the provided requeset and returns the raw bytes and the status code of the response
  489. func (c *Client) DoRaw(r Requester) ([]byte, uint64, error) {
  490. req, err := r.Request()
  491. if err != nil {
  492. return nil, 0, err
  493. }
  494. c.replaceHost(req)
  495. if c.AuthUsername != "" {
  496. req.SetBasicAuth(c.AuthUsername, c.AuthPassword)
  497. }
  498. return c.doRequest(req)
  499. }
  500. // Do runs the request returned by the requestor and returns the parsed response
  501. func (c *Client) Do(r Requester) (*Response, error) {
  502. req, err := r.Request()
  503. if err != nil {
  504. return &Response{}, err
  505. }
  506. c.replaceHost(req)
  507. if c.AuthUsername != "" {
  508. req.SetBasicAuth(c.AuthUsername, c.AuthPassword)
  509. }
  510. body, statusCode, err := c.doRequest(req)
  511. esResp := &Response{Status: statusCode}
  512. if err != nil {
  513. return esResp, err
  514. }
  515. if req.Method != "HEAD" {
  516. err = json.Unmarshal(body, &esResp)
  517. if err != nil {
  518. return esResp, err
  519. }
  520. err = json.Unmarshal(body, &esResp.Raw)
  521. if err != nil {
  522. return esResp, err
  523. }
  524. }
  525. if len(esResp.RawError) > 0 && esResp.RawError[0] == '"' {
  526. json.Unmarshal(esResp.RawError, &esResp.Error)
  527. } else {
  528. esResp.Error = string(esResp.RawError)
  529. }
  530. esResp.RawError = nil
  531. if esResp.Error != "" {
  532. return esResp, &SearchError{esResp.Error, esResp.Status}
  533. }
  534. return esResp, nil
  535. }
  536. func (c *Client) doRequest(req *http.Request) ([]byte, uint64, error) {
  537. resp, err := c.Client.Do(req)
  538. if err != nil {
  539. return nil, 0, err
  540. }
  541. defer resp.Body.Close()
  542. body, err := ioutil.ReadAll(resp.Body)
  543. if err != nil {
  544. return nil, uint64(resp.StatusCode), err
  545. }
  546. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  547. return nil, uint64(resp.StatusCode), errors.New(string(body))
  548. }
  549. return body, uint64(resp.StatusCode), nil
  550. }