You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

604 lines
14 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "reflect"
  15. "strconv"
  16. "strings"
  17. )
  18. const (
  19. BULK_COMMAND_INDEX = "index"
  20. BULK_COMMAND_DELETE = "delete"
  21. )
  22. func (err *SearchError) Error() string {
  23. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  24. }
  25. // NewConnection initiates a new Connection to an elasticsearch server
  26. //
  27. // This function is pretty useless for now but might be useful in a near future
  28. // if wee need more features like connection pooling or load balancing.
  29. func NewConnection(host string, port string) *Connection {
  30. return &Connection{host, port, http.DefaultClient}
  31. }
  32. func (c *Connection) WithClient(cl *http.Client) *Connection {
  33. c.Client = cl
  34. return c
  35. }
  36. // CreateIndex creates a new index represented by a name and a mapping
  37. func (c *Connection) CreateIndex(name string, mapping interface{}) (*Response, error) {
  38. r := Request{
  39. Conn: c,
  40. Query: mapping,
  41. IndexList: []string{name},
  42. method: "PUT",
  43. }
  44. return r.Run()
  45. }
  46. // DeleteIndex deletes an index represented by a name
  47. func (c *Connection) DeleteIndex(name string) (*Response, error) {
  48. r := Request{
  49. Conn: c,
  50. IndexList: []string{name},
  51. method: "DELETE",
  52. }
  53. return r.Run()
  54. }
  55. // RefreshIndex refreshes an index represented by a name
  56. func (c *Connection) RefreshIndex(name string) (*Response, error) {
  57. r := Request{
  58. Conn: c,
  59. IndexList: []string{name},
  60. method: "POST",
  61. api: "_refresh",
  62. }
  63. return r.Run()
  64. }
  65. // UpdateIndexSettings updates settings for existing index represented by a name and a settings
  66. // as described here: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html
  67. func (c *Connection) UpdateIndexSettings(name string, settings interface{}) (*Response, error) {
  68. r := Request{
  69. Conn: c,
  70. Query: settings,
  71. IndexList: []string{name},
  72. method: "PUT",
  73. api: "_settings",
  74. }
  75. return r.Run()
  76. }
  77. // Optimize an index represented by a name, extra args are also allowed please check:
  78. // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize
  79. func (c *Connection) Optimize(indexList []string, extraArgs url.Values) (*Response, error) {
  80. r := Request{
  81. Conn: c,
  82. IndexList: indexList,
  83. ExtraArgs: extraArgs,
  84. method: "POST",
  85. api: "_optimize",
  86. }
  87. return r.Run()
  88. }
  89. // Stats fetches statistics (_stats) for the current elasticsearch server
  90. func (c *Connection) Stats(indexList []string, extraArgs url.Values) (*Response, error) {
  91. r := Request{
  92. Conn: c,
  93. IndexList: indexList,
  94. ExtraArgs: extraArgs,
  95. method: "GET",
  96. api: "_stats",
  97. }
  98. return r.Run()
  99. }
  100. // IndexStatus fetches the status (_status) for the indices defined in
  101. // indexList. Use _all in indexList to get stats for all indices
  102. func (c *Connection) IndexStatus(indexList []string) (*Response, error) {
  103. r := Request{
  104. Conn: c,
  105. IndexList: indexList,
  106. method: "GET",
  107. api: "_status",
  108. }
  109. return r.Run()
  110. }
  111. // Bulk adds multiple documents in bulk mode
  112. func (c *Connection) BulkSend(documents []Document) (*Response, error) {
  113. // We do not generate a traditional JSON here (often a one liner)
  114. // Elasticsearch expects one line of JSON per line (EOL = \n)
  115. // plus an extra \n at the very end of the document
  116. //
  117. // More informations about the Bulk JSON format for Elasticsearch:
  118. //
  119. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  120. //
  121. // This is quite annoying for us as we can not use the simple JSON
  122. // Marshaler available in Run().
  123. //
  124. // We have to generate this special JSON by ourselves which leads to
  125. // the code below.
  126. //
  127. // I know it is unreadable I must find an elegant way to fix this.
  128. // len(documents) * 2 : action + optional_sources
  129. // + 1 : room for the trailing \n
  130. bulkData := make([][]byte, len(documents)*2+1)
  131. i := 0
  132. for _, doc := range documents {
  133. action, err := json.Marshal(map[string]interface{}{
  134. doc.BulkCommand: map[string]interface{}{
  135. "_index": doc.Index,
  136. "_type": doc.Type,
  137. "_id": doc.Id,
  138. },
  139. })
  140. if err != nil {
  141. return &Response{}, err
  142. }
  143. bulkData[i] = action
  144. i++
  145. if doc.Fields != nil {
  146. if docFields, ok := doc.Fields.(map[string]interface{}); ok {
  147. if len(docFields) == 0 {
  148. continue
  149. }
  150. } else {
  151. typeOfFields := reflect.TypeOf(doc.Fields)
  152. if typeOfFields.Kind() == reflect.Ptr {
  153. typeOfFields = typeOfFields.Elem()
  154. }
  155. if typeOfFields.Kind() != reflect.Struct {
  156. return &Response{}, fmt.Errorf("Document fields not in struct or map[string]interface{} format")
  157. }
  158. if typeOfFields.NumField() == 0 {
  159. continue
  160. }
  161. }
  162. sources, err := json.Marshal(doc.Fields)
  163. if err != nil {
  164. return &Response{}, err
  165. }
  166. bulkData[i] = sources
  167. i++
  168. }
  169. }
  170. // forces an extra trailing \n absolutely necessary for elasticsearch
  171. bulkData[len(bulkData)-1] = []byte(nil)
  172. r := Request{
  173. Conn: c,
  174. method: "POST",
  175. api: "_bulk",
  176. bulkData: bytes.Join(bulkData, []byte("\n")),
  177. }
  178. return r.Run()
  179. }
  180. // Search executes a search query against an index
  181. func (c *Connection) Search(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  182. r := Request{
  183. Conn: c,
  184. Query: query,
  185. IndexList: indexList,
  186. TypeList: typeList,
  187. method: "POST",
  188. api: "_search",
  189. ExtraArgs: extraArgs,
  190. }
  191. return r.Run()
  192. }
  193. // Count executes a count query against an index, use the Count field in the response for the result
  194. func (c *Connection) Count(query interface{}, indexList []string, typeList []string, extraArgs url.Values) (*Response, error) {
  195. r := Request{
  196. Conn: c,
  197. Query: query,
  198. IndexList: indexList,
  199. TypeList: typeList,
  200. method: "POST",
  201. api: "_count",
  202. ExtraArgs: extraArgs,
  203. }
  204. return r.Run()
  205. }
  206. //Query runs a query against an index using the provided http method.
  207. //This method can be used to execute a delete by query, just pass in "DELETE"
  208. //for the HTTP method.
  209. func (c *Connection) Query(query interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (*Response, error) {
  210. r := Request{
  211. Conn: c,
  212. Query: query,
  213. IndexList: indexList,
  214. TypeList: typeList,
  215. method: httpMethod,
  216. api: "_query",
  217. ExtraArgs: extraArgs,
  218. }
  219. return r.Run()
  220. }
  221. // Scan starts scroll over an index
  222. func (c *Connection) Scan(query interface{}, indexList []string, typeList []string, timeout string, size int) (*Response, error) {
  223. v := url.Values{}
  224. v.Add("search_type", "scan")
  225. v.Add("scroll", timeout)
  226. v.Add("size", strconv.Itoa(size))
  227. r := Request{
  228. Conn: c,
  229. Query: query,
  230. IndexList: indexList,
  231. TypeList: typeList,
  232. method: "POST",
  233. api: "_search",
  234. ExtraArgs: v,
  235. }
  236. return r.Run()
  237. }
  238. // Scroll fetches data by scroll id
  239. func (c *Connection) Scroll(scrollId string, timeout string) (*Response, error) {
  240. v := url.Values{}
  241. v.Add("scroll", timeout)
  242. r := Request{
  243. Conn: c,
  244. method: "POST",
  245. api: "_search/scroll",
  246. ExtraArgs: v,
  247. Body: []byte(scrollId),
  248. }
  249. return r.Run()
  250. }
  251. // Get a typed document by its id
  252. func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (*Response, error) {
  253. r := Request{
  254. Conn: c,
  255. IndexList: []string{index},
  256. method: "GET",
  257. api: documentType + "/" + id,
  258. ExtraArgs: extraArgs,
  259. }
  260. return r.Run()
  261. }
  262. // Index indexes a Document
  263. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  264. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  265. func (c *Connection) Index(d Document, extraArgs url.Values) (*Response, error) {
  266. r := Request{
  267. Conn: c,
  268. Query: d.Fields,
  269. IndexList: []string{d.Index.(string)},
  270. TypeList: []string{d.Type},
  271. ExtraArgs: extraArgs,
  272. method: "POST",
  273. }
  274. if d.Id != nil {
  275. r.method = "PUT"
  276. r.id = d.Id.(string)
  277. }
  278. return r.Run()
  279. }
  280. // Delete deletes a Document d
  281. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  282. // URL arguments, for example, to control routing.
  283. func (c *Connection) Delete(d Document, extraArgs url.Values) (*Response, error) {
  284. r := Request{
  285. Conn: c,
  286. IndexList: []string{d.Index.(string)},
  287. TypeList: []string{d.Type},
  288. ExtraArgs: extraArgs,
  289. method: "DELETE",
  290. id: d.Id.(string),
  291. }
  292. return r.Run()
  293. }
  294. // Run executes an elasticsearch Request. It converts data to Json, sends the
  295. // request and returns the Response obtained
  296. func (req *Request) Run() (*Response, error) {
  297. body, statusCode, err := req.run()
  298. esResp := &Response{Status: statusCode}
  299. if err != nil {
  300. return esResp, err
  301. }
  302. if req.method != "HEAD" {
  303. err = json.Unmarshal(body, &esResp)
  304. if err != nil {
  305. return esResp, err
  306. }
  307. err = json.Unmarshal(body, &esResp.Raw)
  308. if err != nil {
  309. return esResp, err
  310. }
  311. }
  312. if req.api == "_bulk" && esResp.Errors {
  313. for _, item := range esResp.Items {
  314. for _, i := range item {
  315. if i.Error != "" {
  316. return esResp, &SearchError{i.Error, i.Status}
  317. }
  318. }
  319. }
  320. return esResp, &SearchError{Msg: "Unknown error while bulk indexing"}
  321. }
  322. if esResp.Error != "" {
  323. return esResp, &SearchError{esResp.Error, esResp.Status}
  324. }
  325. return esResp, nil
  326. }
  327. func (req *Request) run() ([]byte, uint64, error) {
  328. postData := []byte{}
  329. // XXX : refactor this
  330. if len(req.Body) > 0 {
  331. postData = req.Body
  332. } else if req.api == "_bulk" {
  333. postData = req.bulkData
  334. } else {
  335. b, err := json.Marshal(req.Query)
  336. if err != nil {
  337. return nil, 0, err
  338. }
  339. postData = b
  340. }
  341. reader := bytes.NewReader(postData)
  342. newReq, err := http.NewRequest(req.method, req.Url(), reader)
  343. if err != nil {
  344. return nil, 0, err
  345. }
  346. if req.method == "POST" || req.method == "PUT" {
  347. newReq.Header.Set("Content-Type", "application/json")
  348. }
  349. resp, err := req.Conn.Client.Do(newReq)
  350. if err != nil {
  351. return nil, 0, err
  352. }
  353. defer resp.Body.Close()
  354. body, err := ioutil.ReadAll(resp.Body)
  355. if err != nil {
  356. return nil, uint64(resp.StatusCode), err
  357. }
  358. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  359. return nil, uint64(resp.StatusCode), errors.New(string(body))
  360. }
  361. return body, uint64(resp.StatusCode), nil
  362. }
  363. // Url builds a Request for a URL
  364. func (r *Request) Url() string {
  365. path := "/" + strings.Join(r.IndexList, ",")
  366. if len(r.TypeList) > 0 {
  367. path += "/" + strings.Join(r.TypeList, ",")
  368. }
  369. // XXX : for indexing documents using the normal (non bulk) API
  370. if len(r.id) > 0 {
  371. path += "/" + r.id
  372. }
  373. path += "/" + r.api
  374. u := url.URL{
  375. Scheme: "http",
  376. Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port),
  377. Path: path,
  378. RawQuery: r.ExtraArgs.Encode(),
  379. }
  380. return u.String()
  381. }
  382. // Buckets returns list of buckets in aggregation
  383. func (a Aggregation) Buckets() []Bucket {
  384. result := []Bucket{}
  385. if buckets, ok := a["buckets"]; ok {
  386. for _, bucket := range buckets.([]interface{}) {
  387. result = append(result, bucket.(map[string]interface{}))
  388. }
  389. }
  390. return result
  391. }
  392. // Key returns key for aggregation bucket
  393. func (b Bucket) Key() interface{} {
  394. return b["key"]
  395. }
  396. // DocCount returns count of documents in this bucket
  397. func (b Bucket) DocCount() uint64 {
  398. return uint64(b["doc_count"].(float64))
  399. }
  400. // Aggregation returns aggregation by name from bucket
  401. func (b Bucket) Aggregation(name string) Aggregation {
  402. if agg, ok := b[name]; ok {
  403. return agg.(map[string]interface{})
  404. } else {
  405. return Aggregation{}
  406. }
  407. }
  408. // PutMapping registers a specific mapping for one or more types in one or more indexes
  409. func (c *Connection) PutMapping(typeName string, mapping interface{}, indexes []string) (*Response, error) {
  410. r := Request{
  411. Conn: c,
  412. Query: mapping,
  413. IndexList: indexes,
  414. method: "PUT",
  415. api: "_mappings/" + typeName,
  416. }
  417. return r.Run()
  418. }
  419. func (c *Connection) GetMapping(types []string, indexes []string) (*Response, error) {
  420. r := Request{
  421. Conn: c,
  422. IndexList: indexes,
  423. method: "GET",
  424. api: "_mapping/" + strings.Join(types, ","),
  425. }
  426. return r.Run()
  427. }
  428. // IndicesExist checks whether index (or indices) exist on the server
  429. func (c *Connection) IndicesExist(indexes []string) (bool, error) {
  430. r := Request{
  431. Conn: c,
  432. IndexList: indexes,
  433. method: "HEAD",
  434. }
  435. resp, err := r.Run()
  436. return resp.Status == 200, err
  437. }
  438. func (c *Connection) Update(d Document, query interface{}, extraArgs url.Values) (*Response, error) {
  439. r := Request{
  440. Conn: c,
  441. Query: query,
  442. IndexList: []string{d.Index.(string)},
  443. TypeList: []string{d.Type},
  444. ExtraArgs: extraArgs,
  445. method: "POST",
  446. api: "_update",
  447. }
  448. if d.Id != nil {
  449. r.id = d.Id.(string)
  450. }
  451. return r.Run()
  452. }
  453. // DeleteMapping deletes a mapping along with all data in the type
  454. func (c *Connection) DeleteMapping(typeName string, indexes []string) (*Response, error) {
  455. r := Request{
  456. Conn: c,
  457. IndexList: indexes,
  458. method: "DELETE",
  459. api: "_mappings/" + typeName,
  460. }
  461. return r.Run()
  462. }
  463. func (c *Connection) modifyAlias(action string, alias string, indexes []string) (*Response, error) {
  464. command := map[string]interface{}{
  465. "actions": make([]map[string]interface{}, 1),
  466. }
  467. for _, index := range indexes {
  468. command["actions"] = append(command["actions"].([]map[string]interface{}), map[string]interface{}{
  469. action: map[string]interface{}{
  470. "index": index,
  471. "alias": alias,
  472. },
  473. })
  474. }
  475. r := Request{
  476. Conn: c,
  477. Query: command,
  478. method: "POST",
  479. api: "_aliases",
  480. }
  481. return r.Run()
  482. }
  483. // AddAlias creates an alias to one or more indexes
  484. func (c *Connection) AddAlias(alias string, indexes []string) (*Response, error) {
  485. return c.modifyAlias("add", alias, indexes)
  486. }
  487. // RemoveAlias removes an alias to one or more indexes
  488. func (c *Connection) RemoveAlias(alias string, indexes []string) (*Response, error) {
  489. return c.modifyAlias("remove", alias, indexes)
  490. }
  491. // AliasExists checks whether alias is defined on the server
  492. func (c *Connection) AliasExists(alias string) (bool, error) {
  493. r := Request{
  494. Conn: c,
  495. method: "HEAD",
  496. api: "_alias/" + alias,
  497. }
  498. resp, err := r.Run()
  499. return resp.Status == 200, err
  500. }