You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

goes.go 16 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "reflect"
  15. "strconv"
  16. "strings"
  17. )
  18. const (
  19. // BulkCommandIndex specifies a bulk doc should be indexed
  20. BulkCommandIndex = "index"
  21. // BulkCommandDelete specifies a bulk doc should be deleted
  22. BulkCommandDelete = "delete"
  23. )
  24. func (err *SearchError) Error() string {
  25. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  26. }
  27. // NewClient initiates a new client for an elasticsearch server
  28. //
  29. // This function is pretty useless for now but might be useful in a near future
  30. // if wee need more features like connection pooling or load balancing.
  31. func NewClient(host string, port string) *Client {
  32. if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") {
  33. host = "http://" + host
  34. }
  35. host = host + ":" + port
  36. u, err := url.Parse(host)
  37. if err != nil {
  38. panic(err)
  39. }
  40. return &Client{u, http.DefaultClient, ""}
  41. }
  42. // WithHTTPClient sets the http.Client to be used with the connection. Returns the original client.
  43. func (c *Client) WithHTTPClient(cl *http.Client) *Client {
  44. c.Client = cl
  45. return c
  46. }
  47. // Version returns the detected version of the connected ES server
  48. func (c *Client) Version() (string, error) {
  49. // Use cached version if it was already fetched
  50. if c.version != "" {
  51. return c.version, nil
  52. }
  53. // Get the version if it was not cached
  54. r := Request{Method: "GET"}
  55. res, err := c.Do(&r)
  56. if err != nil {
  57. return "", err
  58. }
  59. if version, ok := res.Raw["version"].(map[string]interface{}); ok {
  60. if number, ok := version["number"].(string); ok {
  61. c.version = number
  62. return number, nil
  63. }
  64. }
  65. return "", errors.New("No version returned by ElasticSearch Server")
  66. }
  67. // CreateIndex creates a new index represented by a name and a mapping
  68. func (c *Client) CreateIndex(name string, mapping interface{}) (*Response, error) {
  69. r := Request{
  70. Query: mapping,
  71. IndexList: []string{name},
  72. Method: "PUT",
  73. }
  74. return c.Do(&r)
  75. }
  76. // DeleteIndex deletes an index represented by a name
  77. func (c *Client) DeleteIndex(name string) (*Response, error) {
  78. r := Request{
  79. IndexList: []string{name},
  80. Method: "DELETE",
  81. }
  82. return c.Do(&r)
  83. }
  84. // RefreshIndex refreshes an index represented by a name
  85. func (c *Client) RefreshIndex(name string) (*Response, error) {
  86. r := Request{
  87. IndexList: []string{name},
  88. Method: "POST",
  89. API: "_refresh",
  90. }
  91. return c.Do(&r)
  92. }
  93. // UpdateIndexSettings updates settings for existing index represented by a name and a settings
  94. // as described here: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html
  95. func (c *Client) UpdateIndexSettings(name string, settings interface{}) (*Response, error) {
  96. r := Request{
  97. Query: settings,
  98. IndexList: []string{name},
  99. Method: "PUT",
  100. API: "_settings",
  101. }
  102. return c.Do(&r)
  103. }
  104. // Optimize an index represented by a name, extra args are also allowed please check:
  105. // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize
  106. func (c *Client) Optimize(indexList []string, extraArgs url.Values) (*Response, error) {
  107. r := Request{
  108. IndexList: indexList,
  109. ExtraArgs: extraArgs,
  110. Method: "POST",
  111. API: "_optimize",
  112. }
  113. if version, _ := c.Version(); version > "2.1" {
  114. r.API = "_forcemerge"
  115. }
  116. return c.Do(&r)
  117. }
  118. // ForceMerge is the same as Optimize, but matches the naming of the endpoint as of ES 2.1.0
  119. func (c *Client) ForceMerge(indexList []string, extraArgs url.Values) (*Response, error) {
  120. return c.Optimize(indexList, extraArgs)
  121. }
  122. // Stats fetches statistics (_stats) for the current elasticsearch server
  123. func (c *Client) Stats(indexList []string, extraArgs url.Values) (*Response, error) {
  124. r := Request{
  125. IndexList: indexList,
  126. ExtraArgs: extraArgs,
  127. Method: "GET",
  128. API: "_stats",
  129. }
  130. return c.Do(&r)
  131. }
  132. // IndexStatus fetches the status (_status) for the indices defined in
  133. // indexList. Use _all in indexList to get stats for all indices
  134. func (c *Client) IndexStatus(indexList []string) (*Response, error) {
  135. r := Request{
  136. IndexList: indexList,
  137. Method: "GET",
  138. API: "_status",
  139. }
  140. return c.Do(&r)
  141. }
  142. // BulkSend bulk adds multiple documents in bulk mode
  143. func (c *Client) BulkSend(documents []Document) (*Response, error) {
  144. // We do not generate a traditional JSON here (often a one liner)
  145. // Elasticsearch expects one line of JSON per line (EOL = \n)
  146. // plus an extra \n at the very end of the document
  147. //
  148. // More informations about the Bulk JSON format for Elasticsearch:
  149. //
  150. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  151. //
  152. // This is quite annoying for us as we can not use the simple JSON
  153. // Marshaler available in Run().
  154. //
  155. // We have to generate this special JSON by ourselves which leads to
  156. // the code below.
  157. //
  158. // I know it is unreadable I must find an elegant way to fix this.
  159. // len(documents) * 2 : action + optional_sources
  160. // + 1 : room for the trailing \n
  161. bulkData := make([][]byte, 0, len(documents)*2+1)
  162. i := 0
  163. for _, doc := range documents {
  164. action, err := json.Marshal(map[string]interface{}{
  165. doc.BulkCommand: map[string]interface{}{
  166. "_index": doc.Index,
  167. "_type": doc.Type,
  168. "_id": doc.ID,
  169. },
  170. })
  171. if err != nil {
  172. return &Response{}, err
  173. }
  174. bulkData = append(bulkData, action)
  175. i++
  176. if doc.Fields != nil {
  177. if docFields, ok := doc.Fields.(map[string]interface{}); ok {
  178. if len(docFields) == 0 {
  179. continue
  180. }
  181. } else {
  182. typeOfFields := reflect.TypeOf(doc.Fields)
  183. if typeOfFields.Kind() == reflect.Ptr {
  184. typeOfFields = typeOfFields.Elem()
  185. }
  186. if typeOfFields.Kind() != reflect.Struct {
  187. return &Response{}, fmt.Errorf("Document fields not in struct or map[string]interface{} format")
  188. }
  189. if typeOfFields.NumField() == 0 {
  190. continue
  191. }
  192. }
  193. sources, err := json.Marshal(doc.Fields)
  194. if err != nil {
  195. return &Response{}, err
  196. }
  197. bulkData = append(bulkData, sources)
  198. i++
  199. }
  200. }
  201. // forces an extra trailing \n absolutely necessary for elasticsearch
  202. bulkData = append(bulkData, []byte(nil))
  203. r := Request{
  204. Method: "POST",
  205. API: "_bulk",
  206. BulkData: bytes.Join(bulkData, []byte("\n")),
  207. }
  208. resp, err := c.Do(&r)
  209. if err != nil {
  210. return resp, err
  211. }
  212. if resp.Errors {
  213. for _, item := range resp.Items {
  214. for _, i := range item {
  215. if i.Error != "" {
  216. return resp, &SearchError{i.Error, i.Status}
  217. }
  218. }
  219. }
  220. return resp, &SearchError{Msg: "Unknown error while bulk indexing"}
  221. }
  222. return resp, err
  223. }
  224. // Search executes a search query against an index
  225. func (c *Client) Search(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  226. r := Request{
  227. Query: query,
  228. IndexList: indexList,
  229. TypeList: typeList,
  230. Method: "POST",
  231. API: "_search",
  232. ExtraArgs: extraArgs,
  233. }
  234. return c.Do(&r)
  235. }
  236. // Count executes a count query against an index, use the Count field in the response for the result
  237. func (c *Client) Count(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  238. r := Request{
  239. Query: query,
  240. IndexList: indexList,
  241. TypeList: typeList,
  242. Method: "POST",
  243. API: "_count",
  244. ExtraArgs: extraArgs,
  245. }
  246. return c.Do(&r)
  247. }
  248. //Query runs a query against an index using the provided http method.
  249. //This method can be used to execute a delete by query, just pass in "DELETE"
  250. //for the HTTP method.
  251. func (c *Client) Query(query interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (*Response, error) {
  252. r := Request{
  253. Query: query,
  254. IndexList: indexList,
  255. TypeList: typeList,
  256. Method: httpMethod,
  257. API: "_query",
  258. ExtraArgs: extraArgs,
  259. }
  260. return c.Do(&r)
  261. }
  262. // DeleteByQuery deletes documents matching the specified query. It will return an error for ES 2.x,
  263. // because delete by query support was removed in those versions.
  264. func (c *Client) DeleteByQuery(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  265. version, err := c.Version()
  266. if err != nil {
  267. return nil, err
  268. }
  269. if version > "2" && version < "5" {
  270. return nil, errors.New("ElasticSearch 2.x does not support delete by query")
  271. }
  272. r := Request{
  273. Query: query,
  274. IndexList: indexList,
  275. TypeList: typeList,
  276. Method: "DELETE",
  277. API: "_query",
  278. ExtraArgs: extraArgs,
  279. }
  280. if version > "5" {
  281. r.API = "_delete_by_query"
  282. r.Method = "POST"
  283. }
  284. return c.Do(&r)
  285. }
  286. // Scan starts scroll over an index.
  287. // For ES versions < 5.x, it uses search_type=scan; for 5.x it uses sort=_doc. This means that data
  288. // will be returned in the initial response for 5.x versions, but not for older versions. Code
  289. // wishing to be compatible with both should be written to handle either case.
  290. func (c *Client) Scan(query interface{}, indexList []string, typeList []string, timeout string, size int) (*Response, error) {
  291. v := url.Values{}
  292. version, err := c.Version()
  293. if err != nil {
  294. return nil, err
  295. }
  296. if version > "5" {
  297. v.Add("sort", "_doc")
  298. } else {
  299. v.Add("search_type", "scan")
  300. }
  301. v.Add("scroll", timeout)
  302. v.Add("size", strconv.Itoa(size))
  303. r := Request{
  304. Query: query,
  305. IndexList: indexList,
  306. TypeList: typeList,
  307. Method: "POST",
  308. API: "_search",
  309. ExtraArgs: v,
  310. }
  311. return c.Do(&r)
  312. }
  313. // Scroll fetches data by scroll id
  314. func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) {
  315. r := Request{
  316. Method: "POST",
  317. API: "_search/scroll",
  318. }
  319. if version, err := c.Version(); err != nil {
  320. return nil, err
  321. } else if version > "2" {
  322. r.Body, err = json.Marshal(map[string]string{"scroll": timeout, "scroll_id": scrollID})
  323. if err != nil {
  324. return nil, err
  325. }
  326. } else {
  327. v := url.Values{}
  328. v.Add("scroll", timeout)
  329. v.Add("scroll_id", scrollID)
  330. r.ExtraArgs = v
  331. }
  332. return c.Do(&r)
  333. }
  334. // Get a typed document by its id
  335. func (c *Client) Get(index string, documentType string, id string, extraArgs url.Values) (*Response, error) {
  336. r := Request{
  337. IndexList: []string{index},
  338. Method: "GET",
  339. API: documentType + "/" + id,
  340. ExtraArgs: extraArgs,
  341. }
  342. return c.Do(&r)
  343. }
  344. // Index indexes a Document
  345. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  346. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  347. func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) {
  348. r := Request{
  349. Query: d.Fields,
  350. IndexList: []string{d.Index.(string)},
  351. TypeList: []string{d.Type},
  352. ExtraArgs: extraArgs,
  353. Method: "POST",
  354. }
  355. if d.ID != nil {
  356. r.Method = "PUT"
  357. r.ID = d.ID.(string)
  358. }
  359. return c.Do(&r)
  360. }
  361. // Delete deletes a Document d
  362. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  363. // URL arguments, for example, to control routing.
  364. func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) {
  365. r := Request{
  366. IndexList: []string{d.Index.(string)},
  367. TypeList: []string{d.Type},
  368. ExtraArgs: extraArgs,
  369. Method: "DELETE",
  370. ID: d.ID.(string),
  371. }
  372. return c.Do(&r)
  373. }
  374. // Buckets returns list of buckets in aggregation
  375. func (a Aggregation) Buckets() []Bucket {
  376. result := []Bucket{}
  377. if buckets, ok := a["buckets"]; ok {
  378. for _, bucket := range buckets.([]interface{}) {
  379. result = append(result, bucket.(map[string]interface{}))
  380. }
  381. }
  382. return result
  383. }
  384. // Key returns key for aggregation bucket
  385. func (b Bucket) Key() interface{} {
  386. return b["key"]
  387. }
  388. // DocCount returns count of documents in this bucket
  389. func (b Bucket) DocCount() uint64 {
  390. return uint64(b["doc_count"].(float64))
  391. }
  392. // Aggregation returns aggregation by name from bucket
  393. func (b Bucket) Aggregation(name string) Aggregation {
  394. if agg, ok := b[name]; ok {
  395. return agg.(map[string]interface{})
  396. }
  397. return Aggregation{}
  398. }
  399. // PutMapping registers a specific mapping for one or more types in one or more indexes
  400. func (c *Client) PutMapping(typeName string, mapping interface{}, indexes []string) (*Response, error) {
  401. r := Request{
  402. Query: mapping,
  403. IndexList: indexes,
  404. Method: "PUT",
  405. API: "_mappings/" + typeName,
  406. }
  407. return c.Do(&r)
  408. }
  409. // GetMapping returns the mappings for the specified types
  410. func (c *Client) GetMapping(types []string, indexes []string) (*Response, error) {
  411. r := Request{
  412. IndexList: indexes,
  413. Method: "GET",
  414. API: "_mapping/" + strings.Join(types, ","),
  415. }
  416. return c.Do(&r)
  417. }
  418. // IndicesExist checks whether index (or indices) exist on the server
  419. func (c *Client) IndicesExist(indexes []string) (bool, error) {
  420. r := Request{
  421. IndexList: indexes,
  422. Method: "HEAD",
  423. }
  424. resp, err := c.Do(&r)
  425. return resp.Status == 200, err
  426. }
  427. // Update updates the specified document using the _update endpoint
  428. func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) {
  429. r := Request{
  430. Query: query,
  431. IndexList: []string{d.Index.(string)},
  432. TypeList: []string{d.Type},
  433. ExtraArgs: extraArgs,
  434. Method: "POST",
  435. API: "_update",
  436. }
  437. if d.ID != nil {
  438. r.ID = d.ID.(string)
  439. }
  440. return c.Do(&r)
  441. }
  442. // DeleteMapping deletes a mapping along with all data in the type
  443. func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, error) {
  444. if version, err := c.Version(); err != nil {
  445. return nil, err
  446. } else if version > "2" {
  447. return nil, errors.New("Deletion of mappings is not supported in ES 2.x and above.")
  448. }
  449. r := Request{
  450. IndexList: indexes,
  451. Method: "DELETE",
  452. API: "_mappings/" + typeName,
  453. }
  454. return c.Do(&r)
  455. }
  456. func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Response, error) {
  457. command := map[string]interface{}{
  458. "actions": make([]map[string]interface{}, 0, 1),
  459. }
  460. for _, index := range indexes {
  461. command["actions"] = append(command["actions"].([]map[string]interface{}), map[string]interface{}{
  462. action: map[string]interface{}{
  463. "index": index,
  464. "alias": alias,
  465. },
  466. })
  467. }
  468. r := Request{
  469. Query: command,
  470. Method: "POST",
  471. API: "_aliases",
  472. }
  473. return c.Do(&r)
  474. }
  475. // AddAlias creates an alias to one or more indexes
  476. func (c *Client) AddAlias(alias string, indexes []string) (*Response, error) {
  477. return c.modifyAlias("add", alias, indexes)
  478. }
  479. // RemoveAlias removes an alias to one or more indexes
  480. func (c *Client) RemoveAlias(alias string, indexes []string) (*Response, error) {
  481. return c.modifyAlias("remove", alias, indexes)
  482. }
  483. // AliasExists checks whether alias is defined on the server
  484. func (c *Client) AliasExists(alias string) (bool, error) {
  485. r := Request{
  486. Method: "HEAD",
  487. API: "_alias/" + alias,
  488. }
  489. resp, err := c.Do(&r)
  490. return resp.Status == 200, err
  491. }
  492. func (c *Client) replaceHost(req *http.Request) {
  493. req.URL.User = c.Host.User
  494. req.URL.Scheme = c.Host.Scheme
  495. req.URL.Host = c.Host.Host
  496. }
  497. // DoRaw Does the provided requeset and returns the raw bytes and the status code of the response
  498. func (c *Client) DoRaw(r Requester) ([]byte, uint64, error) {
  499. req, err := r.Request()
  500. if err != nil {
  501. return nil, 0, err
  502. }
  503. c.replaceHost(req)
  504. return c.doRequest(req)
  505. }
  506. // Do runs the request returned by the requestor and returns the parsed response
  507. func (c *Client) Do(r Requester) (*Response, error) {
  508. req, err := r.Request()
  509. if err != nil {
  510. return &Response{}, err
  511. }
  512. c.replaceHost(req)
  513. body, statusCode, err := c.doRequest(req)
  514. esResp := &Response{Status: statusCode}
  515. if err != nil {
  516. return esResp, err
  517. }
  518. if req.Method != "HEAD" {
  519. err = json.Unmarshal(body, &esResp)
  520. if err != nil {
  521. return esResp, err
  522. }
  523. err = json.Unmarshal(body, &esResp.Raw)
  524. if err != nil {
  525. return esResp, err
  526. }
  527. }
  528. if len(esResp.RawError) > 0 && esResp.RawError[0] == '"' {
  529. json.Unmarshal(esResp.RawError, &esResp.Error)
  530. } else {
  531. esResp.Error = string(esResp.RawError)
  532. }
  533. esResp.RawError = nil
  534. if esResp.Error != "" {
  535. return esResp, &SearchError{esResp.Error, esResp.Status}
  536. }
  537. return esResp, nil
  538. }
  539. func (c *Client) doRequest(req *http.Request) ([]byte, uint64, error) {
  540. resp, err := c.Client.Do(req)
  541. if err != nil {
  542. return nil, 0, err
  543. }
  544. defer resp.Body.Close()
  545. body, err := ioutil.ReadAll(resp.Body)
  546. if err != nil {
  547. return nil, uint64(resp.StatusCode), err
  548. }
  549. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  550. return nil, uint64(resp.StatusCode), errors.New(string(body))
  551. }
  552. return body, uint64(resp.StatusCode), nil
  553. }