You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

503 lines
12 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "reflect"
  15. "strconv"
  16. "strings"
  17. )
  18. const (
  19. BULK_COMMAND_INDEX = "index"
  20. BULK_COMMAND_DELETE = "delete"
  21. )
  22. func (err *SearchError) Error() string {
  23. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  24. }
  25. // NewConnection initiates a new Connection to an elasticsearch server
  26. //
  27. // This function is pretty useless for now but might be useful in a near future
  28. // if wee need more features like connection pooling or load balancing.
  29. func NewConnection(host string, port string) *Connection {
  30. return &Connection{host, port, http.DefaultClient}
  31. }
  32. func (c *Connection) WithClient(cl *http.Client) *Connection {
  33. c.Client = cl
  34. return c
  35. }
  36. // CreateIndex creates a new index represented by a name and a mapping
  37. func (c *Connection) CreateIndex(name string, mapping map[string]interface{}) (Response, error) {
  38. r := Request{
  39. Conn: c,
  40. Query: mapping,
  41. IndexList: []string{name},
  42. method: "PUT",
  43. }
  44. return r.Run()
  45. }
  46. // DeleteIndex deletes an index represented by a name
  47. func (c *Connection) DeleteIndex(name string) (Response, error) {
  48. r := Request{
  49. Conn: c,
  50. IndexList: []string{name},
  51. method: "DELETE",
  52. }
  53. return r.Run()
  54. }
  55. // RefreshIndex refreshes an index represented by a name
  56. func (c *Connection) RefreshIndex(name string) (Response, error) {
  57. r := Request{
  58. Conn: c,
  59. IndexList: []string{name},
  60. method: "POST",
  61. api: "_refresh",
  62. }
  63. return r.Run()
  64. }
  65. // Optimize an index represented by a name, extra args are also allowed please check:
  66. // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize
  67. func (c *Connection) Optimize(indexList []string, extraArgs url.Values) (Response, error) {
  68. r := Request{
  69. Conn: c,
  70. IndexList: indexList,
  71. ExtraArgs: extraArgs,
  72. method: "POST",
  73. api: "_optimize",
  74. }
  75. return r.Run()
  76. }
  77. // Stats fetches statistics (_stats) for the current elasticsearch server
  78. func (c *Connection) Stats(indexList []string, extraArgs url.Values) (Response, error) {
  79. r := Request{
  80. Conn: c,
  81. IndexList: indexList,
  82. ExtraArgs: extraArgs,
  83. method: "GET",
  84. api: "_stats",
  85. }
  86. return r.Run()
  87. }
  88. // IndexStatus fetches the status (_status) for the indices defined in
  89. // indexList. Use _all in indexList to get stats for all indices
  90. func (c *Connection) IndexStatus(indexList []string) (Response, error) {
  91. r := Request{
  92. Conn: c,
  93. IndexList: indexList,
  94. method: "GET",
  95. api: "_status",
  96. }
  97. return r.Run()
  98. }
  99. // Bulk adds multiple documents in bulk mode
  100. func (c *Connection) BulkSend(documents []Document) (Response, error) {
  101. // We do not generate a traditional JSON here (often a one liner)
  102. // Elasticsearch expects one line of JSON per line (EOL = \n)
  103. // plus an extra \n at the very end of the document
  104. //
  105. // More informations about the Bulk JSON format for Elasticsearch:
  106. //
  107. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  108. //
  109. // This is quite annoying for us as we can not use the simple JSON
  110. // Marshaler available in Run().
  111. //
  112. // We have to generate this special JSON by ourselves which leads to
  113. // the code below.
  114. //
  115. // I know it is unreadable I must find an elegant way to fix this.
  116. // len(documents) * 2 : action + optional_sources
  117. // + 1 : room for the trailing \n
  118. bulkData := make([][]byte, len(documents)*2+1)
  119. i := 0
  120. for _, doc := range documents {
  121. action, err := json.Marshal(map[string]interface{}{
  122. doc.BulkCommand: map[string]interface{}{
  123. "_index": doc.Index,
  124. "_type": doc.Type,
  125. "_id": doc.Id,
  126. },
  127. })
  128. if err != nil {
  129. return Response{}, err
  130. }
  131. bulkData[i] = action
  132. i++
  133. if doc.Fields != nil {
  134. if docFields, ok := doc.Fields.(map[string]interface{}); ok {
  135. if len(docFields) == 0 {
  136. continue
  137. }
  138. } else {
  139. typeOfFields := reflect.TypeOf(doc.Fields)
  140. if typeOfFields.Kind() == reflect.Ptr {
  141. typeOfFields = typeOfFields.Elem()
  142. }
  143. if typeOfFields.Kind() != reflect.Struct {
  144. return Response{}, fmt.Errorf("Document fields not in struct or map[string]interface{} format")
  145. }
  146. if typeOfFields.NumField() == 0 {
  147. continue
  148. }
  149. }
  150. sources, err := json.Marshal(doc.Fields)
  151. if err != nil {
  152. return Response{}, err
  153. }
  154. bulkData[i] = sources
  155. i++
  156. }
  157. }
  158. // forces an extra trailing \n absolutely necessary for elasticsearch
  159. bulkData[len(bulkData)-1] = []byte(nil)
  160. r := Request{
  161. Conn: c,
  162. method: "POST",
  163. api: "_bulk",
  164. bulkData: bytes.Join(bulkData, []byte("\n")),
  165. }
  166. return r.Run()
  167. }
  168. // Search executes a search query against an index
  169. func (c *Connection) Search(query map[string]interface{}, indexList []string, typeList []string, extraArgs url.Values) (Response, error) {
  170. r := Request{
  171. Conn: c,
  172. Query: query,
  173. IndexList: indexList,
  174. TypeList: typeList,
  175. method: "POST",
  176. api: "_search",
  177. ExtraArgs: extraArgs,
  178. }
  179. return r.Run()
  180. }
  181. //Query runs a query against an index using the provided http method.
  182. //This method can be used to execute a delete by query, just pass in "DELETE"
  183. //for the HTTP method.
  184. func (c *Connection) Query(query map[string]interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (Response, error) {
  185. r := Request{
  186. Conn: c,
  187. Query: query,
  188. IndexList: indexList,
  189. TypeList: typeList,
  190. method: httpMethod,
  191. api: "_query",
  192. ExtraArgs: extraArgs,
  193. }
  194. return r.Run()
  195. }
  196. // Scan starts scroll over an index
  197. func (c *Connection) Scan(query map[string]interface{}, indexList []string, typeList []string, timeout string, size int) (Response, error) {
  198. v := url.Values{}
  199. v.Add("search_type", "scan")
  200. v.Add("scroll", timeout)
  201. v.Add("size", strconv.Itoa(size))
  202. r := Request{
  203. Conn: c,
  204. Query: query,
  205. IndexList: indexList,
  206. TypeList: typeList,
  207. method: "POST",
  208. api: "_search",
  209. ExtraArgs: v,
  210. }
  211. return r.Run()
  212. }
  213. // Scroll fetches data by scroll id
  214. func (c *Connection) Scroll(scrollId string, timeout string) (Response, error) {
  215. v := url.Values{}
  216. v.Add("scroll", timeout)
  217. r := Request{
  218. Conn: c,
  219. method: "POST",
  220. api: "_search/scroll",
  221. ExtraArgs: v,
  222. Body: []byte(scrollId),
  223. }
  224. return r.Run()
  225. }
  226. // Get a typed document by its id
  227. func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) {
  228. r := Request{
  229. Conn: c,
  230. IndexList: []string{index},
  231. method: "GET",
  232. api: documentType + "/" + id,
  233. ExtraArgs: extraArgs,
  234. }
  235. return r.Run()
  236. }
  237. // Index indexes a Document
  238. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  239. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  240. func (c *Connection) Index(d Document, extraArgs url.Values) (Response, error) {
  241. r := Request{
  242. Conn: c,
  243. Query: d.Fields,
  244. IndexList: []string{d.Index.(string)},
  245. TypeList: []string{d.Type},
  246. ExtraArgs: extraArgs,
  247. method: "POST",
  248. }
  249. if d.Id != nil {
  250. r.method = "PUT"
  251. r.id = d.Id.(string)
  252. }
  253. return r.Run()
  254. }
  255. // Delete deletes a Document d
  256. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  257. // URL arguments, for example, to control routing.
  258. func (c *Connection) Delete(d Document, extraArgs url.Values) (Response, error) {
  259. r := Request{
  260. Conn: c,
  261. IndexList: []string{d.Index.(string)},
  262. TypeList: []string{d.Type},
  263. ExtraArgs: extraArgs,
  264. method: "DELETE",
  265. id: d.Id.(string),
  266. }
  267. return r.Run()
  268. }
  269. // Run executes an elasticsearch Request. It converts data to Json, sends the
  270. // request and return the Response obtained
  271. func (req *Request) Run() (Response, error) {
  272. postData := []byte{}
  273. // XXX : refactor this
  274. if len(req.Body) > 0 {
  275. postData = req.Body
  276. } else if req.api == "_bulk" {
  277. postData = req.bulkData
  278. } else {
  279. b, err := json.Marshal(req.Query)
  280. if err != nil {
  281. return Response{}, err
  282. }
  283. postData = b
  284. }
  285. reader := bytes.NewReader(postData)
  286. newReq, err := http.NewRequest(req.method, req.Url(), reader)
  287. if err != nil {
  288. return Response{}, err
  289. }
  290. if req.method == "POST" || req.method == "PUT" {
  291. newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  292. }
  293. resp, err := req.Conn.Client.Do(newReq)
  294. if err != nil {
  295. return Response{}, err
  296. }
  297. defer resp.Body.Close()
  298. body, err := ioutil.ReadAll(resp.Body)
  299. if err != nil {
  300. return Response{}, err
  301. }
  302. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  303. return Response{}, errors.New(string(body))
  304. }
  305. esResp := new(Response)
  306. if req.method == "HEAD" {
  307. esResp.Status = uint64(resp.StatusCode)
  308. } else {
  309. err = json.Unmarshal(body, &esResp)
  310. if err != nil {
  311. return Response{}, err
  312. }
  313. json.Unmarshal(body, &esResp.Raw)
  314. }
  315. if req.api == "_bulk" && esResp.Errors {
  316. for _, item := range esResp.Items {
  317. for _, i := range item {
  318. if i.Error != "" {
  319. return Response{}, &SearchError{i.Error, i.Status}
  320. }
  321. }
  322. }
  323. return Response{}, &SearchError{Msg: "Unknown error while bulk indexing"}
  324. }
  325. if esResp.Error != "" {
  326. return Response{}, &SearchError{esResp.Error, esResp.Status}
  327. }
  328. return *esResp, nil
  329. }
  330. // Url builds a Request for a URL
  331. func (r *Request) Url() string {
  332. path := "/" + strings.Join(r.IndexList, ",")
  333. if len(r.TypeList) > 0 {
  334. path += "/" + strings.Join(r.TypeList, ",")
  335. }
  336. // XXX : for indexing documents using the normal (non bulk) API
  337. if len(r.id) > 0 {
  338. path += "/" + r.id
  339. }
  340. path += "/" + r.api
  341. u := url.URL{
  342. Scheme: "http",
  343. Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port),
  344. Path: path,
  345. RawQuery: r.ExtraArgs.Encode(),
  346. }
  347. return u.String()
  348. }
  349. // Buckets returns list of buckets in aggregation
  350. func (a Aggregation) Buckets() []Bucket {
  351. result := []Bucket{}
  352. if buckets, ok := a["buckets"]; ok {
  353. for _, bucket := range buckets.([]interface{}) {
  354. result = append(result, bucket.(map[string]interface{}))
  355. }
  356. }
  357. return result
  358. }
  359. // Key returns key for aggregation bucket
  360. func (b Bucket) Key() interface{} {
  361. return b["key"]
  362. }
  363. // DocCount returns count of documents in this bucket
  364. func (b Bucket) DocCount() uint64 {
  365. return uint64(b["doc_count"].(float64))
  366. }
  367. // Aggregation returns aggregation by name from bucket
  368. func (b Bucket) Aggregation(name string) Aggregation {
  369. if agg, ok := b[name]; ok {
  370. return agg.(map[string]interface{})
  371. } else {
  372. return Aggregation{}
  373. }
  374. }
  375. // PutMapping registers a specific mapping for one or more types in one or more indexes
  376. func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, indexes []string) (Response, error) {
  377. r := Request{
  378. Conn: c,
  379. Query: mapping,
  380. IndexList: indexes,
  381. method: "PUT",
  382. api: "_mappings/" + typeName,
  383. }
  384. return r.Run()
  385. }
  386. func (c *Connection) GetMapping(types []string, indexes []string) (Response, error) {
  387. r := Request{
  388. Conn: c,
  389. IndexList: indexes,
  390. method: "GET",
  391. api: "_mapping/" + strings.Join(types, ","),
  392. }
  393. return r.Run()
  394. }
  395. // IndicesExist checks whether index (or indices) exist on the server
  396. func (c *Connection) IndicesExist(indexes []string) (bool, error) {
  397. r := Request{
  398. Conn: c,
  399. IndexList: indexes,
  400. method: "HEAD",
  401. }
  402. resp, err := r.Run()
  403. return resp.Status == 200, err
  404. }
  405. func (c *Connection) Update(d Document, query map[string]interface{}, extraArgs url.Values) (Response, error) {
  406. r := Request{
  407. Conn: c,
  408. Query: query,
  409. IndexList: []string{d.Index.(string)},
  410. TypeList: []string{d.Type},
  411. ExtraArgs: extraArgs,
  412. method: "POST",
  413. api: "_update",
  414. }
  415. if d.Id != nil {
  416. r.id = d.Id.(string)
  417. }
  418. return r.Run()
  419. }