You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

goes.go 16 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
9 years ago
9 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "reflect"
  15. "strconv"
  16. "strings"
  17. )
  18. const (
  19. // BulkCommandIndex specifies a bulk doc should be indexed
  20. BulkCommandIndex = "index"
  21. // BulkCommandDelete specifies a bulk doc should be deleted
  22. BulkCommandDelete = "delete"
  23. )
  24. func (err *SearchError) Error() string {
  25. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  26. }
  27. // NewClient initiates a new client for an elasticsearch server
  28. //
  29. // This function is pretty useless for now but might be useful in a near future
  30. // if wee need more features like connection pooling or load balancing.
  31. func NewClient(host string, port string) *Client {
  32. return &Client{host, port, http.DefaultClient, "", "", ""}
  33. }
  34. // WithHTTPClient sets the http.Client to be used with the connection. Returns the original client.
  35. func (c *Client) WithHTTPClient(cl *http.Client) *Client {
  36. c.Client = cl
  37. return c
  38. }
  39. // Version returns the detected version of the connected ES server
  40. func (c *Client) Version() (string, error) {
  41. // Use cached version if it was already fetched
  42. if c.version != "" {
  43. return c.version, nil
  44. }
  45. // Get the version if it was not cached
  46. r := Request{Method: "GET"}
  47. res, err := c.Do(&r)
  48. if err != nil {
  49. return "", err
  50. }
  51. if version, ok := res.Raw["version"].(map[string]interface{}); ok {
  52. if number, ok := version["number"].(string); ok {
  53. c.version = number
  54. return number, nil
  55. }
  56. }
  57. return "", errors.New("No version returned by ElasticSearch Server")
  58. }
  59. // CreateIndex creates a new index represented by a name and a mapping
  60. func (c *Client) CreateIndex(name string, mapping interface{}) (*Response, error) {
  61. r := Request{
  62. Query: mapping,
  63. IndexList: []string{name},
  64. Method: "PUT",
  65. }
  66. return c.Do(&r)
  67. }
  68. // DeleteIndex deletes an index represented by a name
  69. func (c *Client) DeleteIndex(name string) (*Response, error) {
  70. r := Request{
  71. IndexList: []string{name},
  72. Method: "DELETE",
  73. }
  74. return c.Do(&r)
  75. }
  76. // RefreshIndex refreshes an index represented by a name
  77. func (c *Client) RefreshIndex(name string) (*Response, error) {
  78. r := Request{
  79. IndexList: []string{name},
  80. Method: "POST",
  81. API: "_refresh",
  82. }
  83. return c.Do(&r)
  84. }
  85. // UpdateIndexSettings updates settings for existing index represented by a name and a settings
  86. // as described here: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html
  87. func (c *Client) UpdateIndexSettings(name string, settings interface{}) (*Response, error) {
  88. r := Request{
  89. Query: settings,
  90. IndexList: []string{name},
  91. Method: "PUT",
  92. API: "_settings",
  93. }
  94. return c.Do(&r)
  95. }
  96. // Optimize an index represented by a name, extra args are also allowed please check:
  97. // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize
  98. func (c *Client) Optimize(indexList []string, extraArgs url.Values) (*Response, error) {
  99. r := Request{
  100. IndexList: indexList,
  101. ExtraArgs: extraArgs,
  102. Method: "POST",
  103. API: "_optimize",
  104. }
  105. if version, _ := c.Version(); version > "2.1" {
  106. r.API = "_forcemerge"
  107. }
  108. return c.Do(&r)
  109. }
  110. // ForceMerge is the same as Optimize, but matches the naming of the endpoint as of ES 2.1.0
  111. func (c *Client) ForceMerge(indexList []string, extraArgs url.Values) (*Response, error) {
  112. return c.Optimize(indexList, extraArgs)
  113. }
  114. // Stats fetches statistics (_stats) for the current elasticsearch server
  115. func (c *Client) Stats(indexList []string, extraArgs url.Values) (*Response, error) {
  116. r := Request{
  117. IndexList: indexList,
  118. ExtraArgs: extraArgs,
  119. Method: "GET",
  120. API: "_stats",
  121. }
  122. return c.Do(&r)
  123. }
  124. // IndexStatus fetches the status (_status) for the indices defined in
  125. // indexList. Use _all in indexList to get stats for all indices
  126. func (c *Client) IndexStatus(indexList []string) (*Response, error) {
  127. r := Request{
  128. IndexList: indexList,
  129. Method: "GET",
  130. API: "_status",
  131. }
  132. return c.Do(&r)
  133. }
  134. // BulkSend bulk adds multiple documents in bulk mode
  135. func (c *Client) BulkSend(documents []Document) (*Response, error) {
  136. // We do not generate a traditional JSON here (often a one liner)
  137. // Elasticsearch expects one line of JSON per line (EOL = \n)
  138. // plus an extra \n at the very end of the document
  139. //
  140. // More informations about the Bulk JSON format for Elasticsearch:
  141. //
  142. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  143. //
  144. // This is quite annoying for us as we can not use the simple JSON
  145. // Marshaler available in Run().
  146. //
  147. // We have to generate this special JSON by ourselves which leads to
  148. // the code below.
  149. //
  150. // I know it is unreadable I must find an elegant way to fix this.
  151. // len(documents) * 2 : action + optional_sources
  152. // + 1 : room for the trailing \n
  153. bulkData := make([][]byte, 0, len(documents)*2+1)
  154. i := 0
  155. for _, doc := range documents {
  156. action, err := json.Marshal(map[string]interface{}{
  157. doc.BulkCommand: map[string]interface{}{
  158. "_index": doc.Index,
  159. "_type": doc.Type,
  160. "_id": doc.ID,
  161. },
  162. })
  163. if err != nil {
  164. return &Response{}, err
  165. }
  166. bulkData = append(bulkData, action)
  167. i++
  168. if doc.Fields != nil {
  169. if docFields, ok := doc.Fields.(map[string]interface{}); ok {
  170. if len(docFields) == 0 {
  171. continue
  172. }
  173. } else {
  174. typeOfFields := reflect.TypeOf(doc.Fields)
  175. if typeOfFields.Kind() == reflect.Ptr {
  176. typeOfFields = typeOfFields.Elem()
  177. }
  178. if typeOfFields.Kind() != reflect.Struct {
  179. return &Response{}, fmt.Errorf("Document fields not in struct or map[string]interface{} format")
  180. }
  181. if typeOfFields.NumField() == 0 {
  182. continue
  183. }
  184. }
  185. sources, err := json.Marshal(doc.Fields)
  186. if err != nil {
  187. return &Response{}, err
  188. }
  189. bulkData = append(bulkData, sources)
  190. i++
  191. }
  192. }
  193. // forces an extra trailing \n absolutely necessary for elasticsearch
  194. bulkData = append(bulkData, []byte(nil))
  195. r := Request{
  196. Method: "POST",
  197. API: "_bulk",
  198. BulkData: bytes.Join(bulkData, []byte("\n")),
  199. }
  200. resp, err := c.Do(&r)
  201. if err != nil {
  202. return resp, err
  203. }
  204. if resp.Errors {
  205. for _, item := range resp.Items {
  206. for _, i := range item {
  207. if i.Error != "" {
  208. return resp, &SearchError{i.Error, i.Status}
  209. }
  210. }
  211. }
  212. return resp, &SearchError{Msg: "Unknown error while bulk indexing"}
  213. }
  214. return resp, err
  215. }
  216. // Search executes a search query against an index
  217. func (c *Client) Search(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  218. r := Request{
  219. Query: query,
  220. IndexList: indexList,
  221. TypeList: typeList,
  222. Method: "POST",
  223. API: "_search",
  224. ExtraArgs: extraArgs,
  225. }
  226. return c.Do(&r)
  227. }
  228. // Count executes a count query against an index, use the Count field in the response for the result
  229. func (c *Client) Count(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  230. r := Request{
  231. Query: query,
  232. IndexList: indexList,
  233. TypeList: typeList,
  234. Method: "POST",
  235. API: "_count",
  236. ExtraArgs: extraArgs,
  237. }
  238. return c.Do(&r)
  239. }
  240. //Query runs a query against an index using the provided http method.
  241. //This method can be used to execute a delete by query, just pass in "DELETE"
  242. //for the HTTP method.
  243. func (c *Client) Query(query interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (*Response, error) {
  244. r := Request{
  245. Query: query,
  246. IndexList: indexList,
  247. TypeList: typeList,
  248. Method: httpMethod,
  249. API: "_query",
  250. ExtraArgs: extraArgs,
  251. }
  252. return c.Do(&r)
  253. }
  254. // DeleteByQuery deletes documents matching the specified query. It will return an error for ES 2.x,
  255. // because delete by query support was removed in those versions.
  256. func (c *Client) DeleteByQuery(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  257. version, err := c.Version()
  258. if err != nil {
  259. return nil, err
  260. }
  261. if version > "2" && version < "5" {
  262. return nil, errors.New("ElasticSearch 2.x does not support delete by query")
  263. }
  264. r := Request{
  265. Query: query,
  266. IndexList: indexList,
  267. TypeList: typeList,
  268. Method: "DELETE",
  269. API: "_query",
  270. ExtraArgs: extraArgs,
  271. }
  272. if version > "5" {
  273. r.API = "_delete_by_query"
  274. r.Method = "POST"
  275. }
  276. return c.Do(&r)
  277. }
  278. // Scan starts scroll over an index.
  279. // For ES versions < 5.x, it uses search_type=scan; for 5.x it uses sort=_doc. This means that data
  280. // will be returned in the initial response for 5.x versions, but not for older versions. Code
  281. // wishing to be compatible with both should be written to handle either case.
  282. func (c *Client) Scan(query interface{}, indexList []string, typeList []string, timeout string, size int) (*Response, error) {
  283. v := url.Values{}
  284. version, err := c.Version()
  285. if err != nil {
  286. return nil, err
  287. }
  288. if version > "5" {
  289. v.Add("sort", "_doc")
  290. } else {
  291. v.Add("search_type", "scan")
  292. }
  293. v.Add("scroll", timeout)
  294. v.Add("size", strconv.Itoa(size))
  295. r := Request{
  296. Query: query,
  297. IndexList: indexList,
  298. TypeList: typeList,
  299. Method: "POST",
  300. API: "_search",
  301. ExtraArgs: v,
  302. }
  303. return c.Do(&r)
  304. }
  305. // Scroll fetches data by scroll id
  306. func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) {
  307. r := Request{
  308. Method: "POST",
  309. API: "_search/scroll",
  310. }
  311. if version, err := c.Version(); err != nil {
  312. return nil, err
  313. } else if version > "2" {
  314. r.Body, err = json.Marshal(map[string]string{"scroll": timeout, "scroll_id": scrollID})
  315. if err != nil {
  316. return nil, err
  317. }
  318. } else {
  319. v := url.Values{}
  320. v.Add("scroll", timeout)
  321. v.Add("scroll_id", scrollID)
  322. r.ExtraArgs = v
  323. }
  324. return c.Do(&r)
  325. }
  326. // Get a typed document by its id
  327. func (c *Client) Get(index string, documentType string, id string, extraArgs url.Values) (*Response, error) {
  328. r := Request{
  329. IndexList: []string{index},
  330. Method: "GET",
  331. API: documentType + "/" + id,
  332. ExtraArgs: extraArgs,
  333. }
  334. return c.Do(&r)
  335. }
  336. // Index indexes a Document
  337. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  338. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  339. func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) {
  340. r := Request{
  341. Query: d.Fields,
  342. IndexList: []string{d.Index.(string)},
  343. TypeList: []string{d.Type},
  344. ExtraArgs: extraArgs,
  345. Method: "POST",
  346. }
  347. if d.ID != nil {
  348. r.Method = "PUT"
  349. r.ID = d.ID.(string)
  350. }
  351. return c.Do(&r)
  352. }
  353. // Delete deletes a Document d
  354. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  355. // URL arguments, for example, to control routing.
  356. func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) {
  357. r := Request{
  358. IndexList: []string{d.Index.(string)},
  359. TypeList: []string{d.Type},
  360. ExtraArgs: extraArgs,
  361. Method: "DELETE",
  362. ID: d.ID.(string),
  363. }
  364. return c.Do(&r)
  365. }
  366. // Buckets returns list of buckets in aggregation
  367. func (a Aggregation) Buckets() []Bucket {
  368. result := []Bucket{}
  369. if buckets, ok := a["buckets"]; ok {
  370. for _, bucket := range buckets.([]interface{}) {
  371. result = append(result, bucket.(map[string]interface{}))
  372. }
  373. }
  374. return result
  375. }
  376. // Key returns key for aggregation bucket
  377. func (b Bucket) Key() interface{} {
  378. return b["key"]
  379. }
  380. // DocCount returns count of documents in this bucket
  381. func (b Bucket) DocCount() uint64 {
  382. return uint64(b["doc_count"].(float64))
  383. }
  384. // Aggregation returns aggregation by name from bucket
  385. func (b Bucket) Aggregation(name string) Aggregation {
  386. if agg, ok := b[name]; ok {
  387. return agg.(map[string]interface{})
  388. }
  389. return Aggregation{}
  390. }
  391. // PutMapping registers a specific mapping for one or more types in one or more indexes
  392. func (c *Client) PutMapping(typeName string, mapping interface{}, indexes []string) (*Response, error) {
  393. r := Request{
  394. Query: mapping,
  395. IndexList: indexes,
  396. Method: "PUT",
  397. API: "_mappings/" + typeName,
  398. }
  399. return c.Do(&r)
  400. }
  401. // GetMapping returns the mappings for the specified types
  402. func (c *Client) GetMapping(types []string, indexes []string) (*Response, error) {
  403. r := Request{
  404. IndexList: indexes,
  405. Method: "GET",
  406. API: "_mapping/" + strings.Join(types, ","),
  407. }
  408. return c.Do(&r)
  409. }
  410. // IndicesExist checks whether index (or indices) exist on the server
  411. func (c *Client) IndicesExist(indexes []string) (bool, error) {
  412. r := Request{
  413. IndexList: indexes,
  414. Method: "HEAD",
  415. }
  416. resp, err := c.Do(&r)
  417. return resp.Status == 200, err
  418. }
  419. // Update updates the specified document using the _update endpoint
  420. func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) {
  421. r := Request{
  422. Query: query,
  423. IndexList: []string{d.Index.(string)},
  424. TypeList: []string{d.Type},
  425. ExtraArgs: extraArgs,
  426. Method: "POST",
  427. API: "_update",
  428. }
  429. if d.ID != nil {
  430. r.ID = d.ID.(string)
  431. }
  432. return c.Do(&r)
  433. }
  434. // DeleteMapping deletes a mapping along with all data in the type
  435. func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, error) {
  436. if version, err := c.Version(); err != nil {
  437. return nil, err
  438. } else if version > "2" {
  439. return nil, errors.New("Deletion of mappings is not supported in ES 2.x and above.")
  440. }
  441. r := Request{
  442. IndexList: indexes,
  443. Method: "DELETE",
  444. API: "_mappings/" + typeName,
  445. }
  446. return c.Do(&r)
  447. }
  448. func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Response, error) {
  449. command := map[string]interface{}{
  450. "actions": make([]map[string]interface{}, 0, 1),
  451. }
  452. for _, index := range indexes {
  453. command["actions"] = append(command["actions"].([]map[string]interface{}), map[string]interface{}{
  454. action: map[string]interface{}{
  455. "index": index,
  456. "alias": alias,
  457. },
  458. })
  459. }
  460. r := Request{
  461. Query: command,
  462. Method: "POST",
  463. API: "_aliases",
  464. }
  465. return c.Do(&r)
  466. }
  467. // AddAlias creates an alias to one or more indexes
  468. func (c *Client) AddAlias(alias string, indexes []string) (*Response, error) {
  469. return c.modifyAlias("add", alias, indexes)
  470. }
  471. // RemoveAlias removes an alias to one or more indexes
  472. func (c *Client) RemoveAlias(alias string, indexes []string) (*Response, error) {
  473. return c.modifyAlias("remove", alias, indexes)
  474. }
  475. // AliasExists checks whether alias is defined on the server
  476. func (c *Client) AliasExists(alias string) (bool, error) {
  477. r := Request{
  478. Method: "HEAD",
  479. API: "_alias/" + alias,
  480. }
  481. resp, err := c.Do(&r)
  482. return resp.Status == 200, err
  483. }
  484. func (c *Client) replaceHost(req *http.Request) {
  485. req.URL.Scheme = "http"
  486. req.URL.Host = fmt.Sprintf("%s:%s", c.Host, c.Port)
  487. }
  488. // DoRaw Does the provided requeset and returns the raw bytes and the status code of the response
  489. func (c *Client) DoRaw(r Requester) ([]byte, uint64, error) {
  490. req, err := r.Request()
  491. if err != nil {
  492. return nil, 0, err
  493. }
  494. c.replaceHost(req)
  495. if c.AuthUsername != "" {
  496. req.SetBasicAuth(c.AuthUsername, c.AuthPassword)
  497. }
  498. return c.doRequest(req)
  499. }
  500. // Do runs the request returned by the requestor and returns the parsed response
  501. func (c *Client) Do(r Requester) (*Response, error) {
  502. req, err := r.Request()
  503. if err != nil {
  504. return &Response{}, err
  505. }
  506. c.replaceHost(req)
  507. if c.AuthUsername != "" {
  508. req.SetBasicAuth(c.AuthUsername, c.AuthPassword)
  509. }
  510. body, statusCode, err := c.doRequest(req)
  511. esResp := &Response{Status: statusCode}
  512. if err != nil {
  513. return esResp, err
  514. }
  515. if req.Method != "HEAD" {
  516. err = json.Unmarshal(body, &esResp)
  517. if err != nil {
  518. return esResp, err
  519. }
  520. err = json.Unmarshal(body, &esResp.Raw)
  521. if err != nil {
  522. return esResp, err
  523. }
  524. }
  525. if len(esResp.RawError) > 0 && esResp.RawError[0] == '"' {
  526. json.Unmarshal(esResp.RawError, &esResp.Error)
  527. } else {
  528. esResp.Error = string(esResp.RawError)
  529. }
  530. esResp.RawError = nil
  531. if esResp.Error != "" {
  532. return esResp, &SearchError{esResp.Error, esResp.Status}
  533. }
  534. return esResp, nil
  535. }
  536. func (c *Client) doRequest(req *http.Request) ([]byte, uint64, error) {
  537. resp, err := c.Client.Do(req)
  538. if err != nil {
  539. return nil, 0, err
  540. }
  541. defer resp.Body.Close()
  542. body, err := ioutil.ReadAll(resp.Body)
  543. if err != nil {
  544. return nil, uint64(resp.StatusCode), err
  545. }
  546. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  547. return nil, uint64(resp.StatusCode), errors.New(string(body))
  548. }
  549. return body, uint64(resp.StatusCode), nil
  550. }