You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

608 lines
14 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "reflect"
  15. "strconv"
  16. "strings"
  17. )
  18. const (
  19. // BulkCommandIndex specifies a bulk doc should be indexed
  20. BulkCommandIndex = "index"
  21. // BulkCommandDelete specifies a bulk doc should be deleted
  22. BulkCommandDelete = "delete"
  23. )
  24. func (err *SearchError) Error() string {
  25. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  26. }
  27. // NewClient initiates a new client for an elasticsearch server
  28. //
  29. // This function is pretty useless for now but might be useful in a near future
  30. // if wee need more features like connection pooling or load balancing.
  31. func NewClient(host string, port string) *Client {
  32. return &Client{host, port, http.DefaultClient}
  33. }
  34. // WithHTTPClient sets the http.Client to be used with the connection. Returns the original client.
  35. func (c *Client) WithHTTPClient(cl *http.Client) *Client {
  36. c.Client = cl
  37. return c
  38. }
  39. // CreateIndex creates a new index represented by a name and a mapping
  40. func (c *Client) CreateIndex(name string, mapping interface{}) (*Response, error) {
  41. r := Request{
  42. Conn: c,
  43. Query: mapping,
  44. IndexList: []string{name},
  45. method: "PUT",
  46. }
  47. return r.Run()
  48. }
  49. // DeleteIndex deletes an index represented by a name
  50. func (c *Client) DeleteIndex(name string) (*Response, error) {
  51. r := Request{
  52. Conn: c,
  53. IndexList: []string{name},
  54. method: "DELETE",
  55. }
  56. return r.Run()
  57. }
  58. // RefreshIndex refreshes an index represented by a name
  59. func (c *Client) RefreshIndex(name string) (*Response, error) {
  60. r := Request{
  61. Conn: c,
  62. IndexList: []string{name},
  63. method: "POST",
  64. api: "_refresh",
  65. }
  66. return r.Run()
  67. }
  68. // UpdateIndexSettings updates settings for existing index represented by a name and a settings
  69. // as described here: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html
  70. func (c *Client) UpdateIndexSettings(name string, settings interface{}) (*Response, error) {
  71. r := Request{
  72. Conn: c,
  73. Query: settings,
  74. IndexList: []string{name},
  75. method: "PUT",
  76. api: "_settings",
  77. }
  78. return r.Run()
  79. }
  80. // Optimize an index represented by a name, extra args are also allowed please check:
  81. // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize
  82. func (c *Client) Optimize(indexList []string, extraArgs url.Values) (*Response, error) {
  83. r := Request{
  84. Conn: c,
  85. IndexList: indexList,
  86. ExtraArgs: extraArgs,
  87. method: "POST",
  88. api: "_optimize",
  89. }
  90. return r.Run()
  91. }
  92. // Stats fetches statistics (_stats) for the current elasticsearch server
  93. func (c *Client) Stats(indexList []string, extraArgs url.Values) (*Response, error) {
  94. r := Request{
  95. Conn: c,
  96. IndexList: indexList,
  97. ExtraArgs: extraArgs,
  98. method: "GET",
  99. api: "_stats",
  100. }
  101. return r.Run()
  102. }
  103. // IndexStatus fetches the status (_status) for the indices defined in
  104. // indexList. Use _all in indexList to get stats for all indices
  105. func (c *Client) IndexStatus(indexList []string) (*Response, error) {
  106. r := Request{
  107. Conn: c,
  108. IndexList: indexList,
  109. method: "GET",
  110. api: "_status",
  111. }
  112. return r.Run()
  113. }
  114. // BulkSend bulk adds multiple documents in bulk mode
  115. func (c *Client) BulkSend(documents []Document) (*Response, error) {
  116. // We do not generate a traditional JSON here (often a one liner)
  117. // Elasticsearch expects one line of JSON per line (EOL = \n)
  118. // plus an extra \n at the very end of the document
  119. //
  120. // More informations about the Bulk JSON format for Elasticsearch:
  121. //
  122. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  123. //
  124. // This is quite annoying for us as we can not use the simple JSON
  125. // Marshaler available in Run().
  126. //
  127. // We have to generate this special JSON by ourselves which leads to
  128. // the code below.
  129. //
  130. // I know it is unreadable I must find an elegant way to fix this.
  131. // len(documents) * 2 : action + optional_sources
  132. // + 1 : room for the trailing \n
  133. bulkData := make([][]byte, len(documents)*2+1)
  134. i := 0
  135. for _, doc := range documents {
  136. action, err := json.Marshal(map[string]interface{}{
  137. doc.BulkCommand: map[string]interface{}{
  138. "_index": doc.Index,
  139. "_type": doc.Type,
  140. "_id": doc.ID,
  141. },
  142. })
  143. if err != nil {
  144. return &Response{}, err
  145. }
  146. bulkData[i] = action
  147. i++
  148. if doc.Fields != nil {
  149. if docFields, ok := doc.Fields.(map[string]interface{}); ok {
  150. if len(docFields) == 0 {
  151. continue
  152. }
  153. } else {
  154. typeOfFields := reflect.TypeOf(doc.Fields)
  155. if typeOfFields.Kind() == reflect.Ptr {
  156. typeOfFields = typeOfFields.Elem()
  157. }
  158. if typeOfFields.Kind() != reflect.Struct {
  159. return &Response{}, fmt.Errorf("Document fields not in struct or map[string]interface{} format")
  160. }
  161. if typeOfFields.NumField() == 0 {
  162. continue
  163. }
  164. }
  165. sources, err := json.Marshal(doc.Fields)
  166. if err != nil {
  167. return &Response{}, err
  168. }
  169. bulkData[i] = sources
  170. i++
  171. }
  172. }
  173. // forces an extra trailing \n absolutely necessary for elasticsearch
  174. bulkData[len(bulkData)-1] = []byte(nil)
  175. r := Request{
  176. Conn: c,
  177. method: "POST",
  178. api: "_bulk",
  179. bulkData: bytes.Join(bulkData, []byte("\n")),
  180. }
  181. return r.Run()
  182. }
  183. // Search executes a search query against an index
  184. func (c *Client) Search(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  185. r := Request{
  186. Conn: c,
  187. Query: query,
  188. IndexList: indexList,
  189. TypeList: typeList,
  190. method: "POST",
  191. api: "_search",
  192. ExtraArgs: extraArgs,
  193. }
  194. return r.Run()
  195. }
  196. // Count executes a count query against an index, use the Count field in the response for the result
  197. func (c *Client) Count(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  198. r := Request{
  199. Conn: c,
  200. Query: query,
  201. IndexList: indexList,
  202. TypeList: typeList,
  203. method: "POST",
  204. api: "_count",
  205. ExtraArgs: extraArgs,
  206. }
  207. return r.Run()
  208. }
  209. //Query runs a query against an index using the provided http method.
  210. //This method can be used to execute a delete by query, just pass in "DELETE"
  211. //for the HTTP method.
  212. func (c *Client) Query(query interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (*Response, error) {
  213. r := Request{
  214. Conn: c,
  215. Query: query,
  216. IndexList: indexList,
  217. TypeList: typeList,
  218. method: httpMethod,
  219. api: "_query",
  220. ExtraArgs: extraArgs,
  221. }
  222. return r.Run()
  223. }
  224. // Scan starts scroll over an index
  225. func (c *Client) Scan(query interface{}, indexList []string, typeList []string, timeout string, size int) (*Response, error) {
  226. v := url.Values{}
  227. v.Add("search_type", "scan")
  228. v.Add("scroll", timeout)
  229. v.Add("size", strconv.Itoa(size))
  230. r := Request{
  231. Conn: c,
  232. Query: query,
  233. IndexList: indexList,
  234. TypeList: typeList,
  235. method: "POST",
  236. api: "_search",
  237. ExtraArgs: v,
  238. }
  239. return r.Run()
  240. }
  241. // Scroll fetches data by scroll id
  242. func (c *Client) Scroll(scrollID string, timeout string) (*Response, error) {
  243. v := url.Values{}
  244. v.Add("scroll", timeout)
  245. r := Request{
  246. Conn: c,
  247. method: "POST",
  248. api: "_search/scroll",
  249. ExtraArgs: v,
  250. Body: []byte(scrollID),
  251. }
  252. return r.Run()
  253. }
  254. // Get a typed document by its id
  255. func (c *Client) Get(index string, documentType string, id string, extraArgs url.Values) (*Response, error) {
  256. r := Request{
  257. Conn: c,
  258. IndexList: []string{index},
  259. method: "GET",
  260. api: documentType + "/" + id,
  261. ExtraArgs: extraArgs,
  262. }
  263. return r.Run()
  264. }
  265. // Index indexes a Document
  266. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  267. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  268. func (c *Client) Index(d Document, extraArgs url.Values) (*Response, error) {
  269. r := Request{
  270. Conn: c,
  271. Query: d.Fields,
  272. IndexList: []string{d.Index.(string)},
  273. TypeList: []string{d.Type},
  274. ExtraArgs: extraArgs,
  275. method: "POST",
  276. }
  277. if d.ID != nil {
  278. r.method = "PUT"
  279. r.id = d.ID.(string)
  280. }
  281. return r.Run()
  282. }
  283. // Delete deletes a Document d
  284. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  285. // URL arguments, for example, to control routing.
  286. func (c *Client) Delete(d Document, extraArgs url.Values) (*Response, error) {
  287. r := Request{
  288. Conn: c,
  289. IndexList: []string{d.Index.(string)},
  290. TypeList: []string{d.Type},
  291. ExtraArgs: extraArgs,
  292. method: "DELETE",
  293. id: d.ID.(string),
  294. }
  295. return r.Run()
  296. }
  297. // Run executes an elasticsearch Request. It converts data to Json, sends the
  298. // request and returns the Response obtained
  299. func (req *Request) Run() (*Response, error) {
  300. body, statusCode, err := req.run()
  301. esResp := &Response{Status: statusCode}
  302. if err != nil {
  303. return esResp, err
  304. }
  305. if req.method != "HEAD" {
  306. err = json.Unmarshal(body, &esResp)
  307. if err != nil {
  308. return esResp, err
  309. }
  310. err = json.Unmarshal(body, &esResp.Raw)
  311. if err != nil {
  312. return esResp, err
  313. }
  314. }
  315. if req.api == "_bulk" && esResp.Errors {
  316. for _, item := range esResp.Items {
  317. for _, i := range item {
  318. if i.Error != "" {
  319. return esResp, &SearchError{i.Error, i.Status}
  320. }
  321. }
  322. }
  323. return esResp, &SearchError{Msg: "Unknown error while bulk indexing"}
  324. }
  325. if esResp.Error != "" {
  326. return esResp, &SearchError{esResp.Error, esResp.Status}
  327. }
  328. return esResp, nil
  329. }
  330. func (req *Request) run() ([]byte, uint64, error) {
  331. postData := []byte{}
  332. // XXX : refactor this
  333. if len(req.Body) > 0 {
  334. postData = req.Body
  335. } else if req.api == "_bulk" {
  336. postData = req.bulkData
  337. } else {
  338. b, err := json.Marshal(req.Query)
  339. if err != nil {
  340. return nil, 0, err
  341. }
  342. postData = b
  343. }
  344. reader := bytes.NewReader(postData)
  345. newReq, err := http.NewRequest(req.method, req.URL(), reader)
  346. if err != nil {
  347. return nil, 0, err
  348. }
  349. if req.method == "POST" || req.method == "PUT" {
  350. newReq.Header.Set("Content-Type", "application/json")
  351. }
  352. resp, err := req.Conn.Client.Do(newReq)
  353. if err != nil {
  354. return nil, 0, err
  355. }
  356. defer resp.Body.Close()
  357. body, err := ioutil.ReadAll(resp.Body)
  358. if err != nil {
  359. return nil, uint64(resp.StatusCode), err
  360. }
  361. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  362. return nil, uint64(resp.StatusCode), errors.New(string(body))
  363. }
  364. return body, uint64(resp.StatusCode), nil
  365. }
  366. // URL builds a Request for a URL
  367. func (req *Request) URL() string {
  368. path := "/" + strings.Join(req.IndexList, ",")
  369. if len(req.TypeList) > 0 {
  370. path += "/" + strings.Join(req.TypeList, ",")
  371. }
  372. // XXX : for indexing documents using the normal (non bulk) API
  373. if len(req.id) > 0 {
  374. path += "/" + req.id
  375. }
  376. path += "/" + req.api
  377. u := url.URL{
  378. Scheme: "http",
  379. Host: fmt.Sprintf("%s:%s", req.Conn.Host, req.Conn.Port),
  380. Path: path,
  381. RawQuery: req.ExtraArgs.Encode(),
  382. }
  383. return u.String()
  384. }
  385. // Buckets returns list of buckets in aggregation
  386. func (a Aggregation) Buckets() []Bucket {
  387. result := []Bucket{}
  388. if buckets, ok := a["buckets"]; ok {
  389. for _, bucket := range buckets.([]interface{}) {
  390. result = append(result, bucket.(map[string]interface{}))
  391. }
  392. }
  393. return result
  394. }
  395. // Key returns key for aggregation bucket
  396. func (b Bucket) Key() interface{} {
  397. return b["key"]
  398. }
  399. // DocCount returns count of documents in this bucket
  400. func (b Bucket) DocCount() uint64 {
  401. return uint64(b["doc_count"].(float64))
  402. }
  403. // Aggregation returns aggregation by name from bucket
  404. func (b Bucket) Aggregation(name string) Aggregation {
  405. if agg, ok := b[name]; ok {
  406. return agg.(map[string]interface{})
  407. }
  408. return Aggregation{}
  409. }
  410. // PutMapping registers a specific mapping for one or more types in one or more indexes
  411. func (c *Client) PutMapping(typeName string, mapping interface{}, indexes []string) (*Response, error) {
  412. r := Request{
  413. Conn: c,
  414. Query: mapping,
  415. IndexList: indexes,
  416. method: "PUT",
  417. api: "_mappings/" + typeName,
  418. }
  419. return r.Run()
  420. }
  421. // GetMapping returns the mappings for the specified types
  422. func (c *Client) GetMapping(types []string, indexes []string) (*Response, error) {
  423. r := Request{
  424. Conn: c,
  425. IndexList: indexes,
  426. method: "GET",
  427. api: "_mapping/" + strings.Join(types, ","),
  428. }
  429. return r.Run()
  430. }
  431. // IndicesExist checks whether index (or indices) exist on the server
  432. func (c *Client) IndicesExist(indexes []string) (bool, error) {
  433. r := Request{
  434. Conn: c,
  435. IndexList: indexes,
  436. method: "HEAD",
  437. }
  438. resp, err := r.Run()
  439. return resp.Status == 200, err
  440. }
  441. // Update updates the specified document using the _update endpoint
  442. func (c *Client) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) {
  443. r := Request{
  444. Conn: c,
  445. Query: query,
  446. IndexList: []string{d.Index.(string)},
  447. TypeList: []string{d.Type},
  448. ExtraArgs: extraArgs,
  449. method: "POST",
  450. api: "_update",
  451. }
  452. if d.ID != nil {
  453. r.id = d.ID.(string)
  454. }
  455. return r.Run()
  456. }
  457. // DeleteMapping deletes a mapping along with all data in the type
  458. func (c *Client) DeleteMapping(typeName string, indexes []string) (*Response, error) {
  459. r := Request{
  460. Conn: c,
  461. IndexList: indexes,
  462. method: "DELETE",
  463. api: "_mappings/" + typeName,
  464. }
  465. return r.Run()
  466. }
  467. func (c *Client) modifyAlias(action string, alias string, indexes []string) (*Response, error) {
  468. command := map[string]interface{}{
  469. "actions": make([]map[string]interface{}, 1),
  470. }
  471. for _, index := range indexes {
  472. command["actions"] = append(command["actions"].([]map[string]interface{}), map[string]interface{}{
  473. action: map[string]interface{}{
  474. "index": index,
  475. "alias": alias,
  476. },
  477. })
  478. }
  479. r := Request{
  480. Conn: c,
  481. Query: command,
  482. method: "POST",
  483. api: "_aliases",
  484. }
  485. return r.Run()
  486. }
  487. // AddAlias creates an alias to one or more indexes
  488. func (c *Client) AddAlias(alias string, indexes []string) (*Response, error) {
  489. return c.modifyAlias("add", alias, indexes)
  490. }
  491. // RemoveAlias removes an alias to one or more indexes
  492. func (c *Client) RemoveAlias(alias string, indexes []string) (*Response, error) {
  493. return c.modifyAlias("remove", alias, indexes)
  494. }
  495. // AliasExists checks whether alias is defined on the server
  496. func (c *Client) AliasExists(alias string) (bool, error) {
  497. r := Request{
  498. Conn: c,
  499. method: "HEAD",
  500. api: "_alias/" + alias,
  501. }
  502. resp, err := r.Run()
  503. return resp.Status == 200, err
  504. }