You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

386 lines
8.9 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "strconv"
  15. "strings"
  16. )
  17. const (
  18. BULK_COMMAND_INDEX = "index"
  19. BULK_COMMAND_DELETE = "delete"
  20. )
  21. func (err *SearchError) Error() string {
  22. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  23. }
  24. // NewConnection initiates a new Connection to an elasticsearch server
  25. //
  26. // This function is pretty useless for now but might be useful in a near future
  27. // if wee need more features like connection pooling or load balancing.
  28. func NewConnection(host string, port string) *Connection {
  29. return &Connection{host, port, http.DefaultClient}
  30. }
  31. func (c *Connection) WithClient(cl *http.Client) *Connection {
  32. c.Client = cl
  33. return c
  34. }
  35. // CreateIndex creates a new index represented by a name and a mapping
  36. func (c *Connection) CreateIndex(name string, mapping map[string]interface{}) (Response, error) {
  37. r := Request{
  38. Conn: c,
  39. Query: mapping,
  40. IndexList: []string{name},
  41. method: "PUT",
  42. }
  43. return r.Run()
  44. }
  45. // DeleteIndex deletes an index represented by a name
  46. func (c *Connection) DeleteIndex(name string) (Response, error) {
  47. r := Request{
  48. Conn: c,
  49. IndexList: []string{name},
  50. method: "DELETE",
  51. }
  52. return r.Run()
  53. }
  54. // RefreshIndex refreshes an index represented by a name
  55. func (c *Connection) RefreshIndex(name string) (Response, error) {
  56. r := Request{
  57. Conn: c,
  58. IndexList: []string{name},
  59. method: "POST",
  60. api: "_refresh",
  61. }
  62. return r.Run()
  63. }
  64. // Stats fetches statistics (_stats) for the current elasticsearch server
  65. func (c *Connection) Stats(indexList []string, extraArgs url.Values) (Response, error) {
  66. r := Request{
  67. Conn: c,
  68. IndexList: indexList,
  69. ExtraArgs: extraArgs,
  70. method: "GET",
  71. api: "_stats",
  72. }
  73. return r.Run()
  74. }
  75. // IndexStatus fetches the status (_status) for the indices defined in
  76. // indexList. Use _all in indexList to get stats for all indices
  77. func (c *Connection) IndexStatus(indexList []string) (Response, error) {
  78. r := Request{
  79. Conn: c,
  80. IndexList: indexList,
  81. method: "GET",
  82. api: "_status",
  83. }
  84. return r.Run()
  85. }
  86. // Bulk adds multiple documents in bulk mode to the index for a given type
  87. func (c *Connection) BulkSend(index string, documents []Document) (Response, error) {
  88. // We do not generate a traditionnal JSON here (often a one liner)
  89. // Elasticsearch expects one line of JSON per line (EOL = \n)
  90. // plus an extra \n at the very end of the document
  91. //
  92. // More informations about the Bulk JSON format for Elasticsearch:
  93. //
  94. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  95. //
  96. // This is quite annoying for us as we can not use the simple JSON
  97. // Marshaler available in Run().
  98. //
  99. // We have to generate this special JSON by ourselves which leads to
  100. // the code below.
  101. //
  102. // I know it is unreadable I must find an elegant way to fix this.
  103. // len(documents) * 2 : action + optional_sources
  104. // + 1 : room for the trailing \n
  105. bulkData := make([][]byte, len(documents)*2+1)
  106. i := 0
  107. for _, doc := range documents {
  108. action, err := json.Marshal(map[string]interface{}{
  109. doc.BulkCommand: map[string]interface{}{
  110. "_index": doc.Index,
  111. "_type": doc.Type,
  112. "_id": doc.Id,
  113. },
  114. })
  115. if err != nil {
  116. return Response{}, err
  117. }
  118. bulkData[i] = action
  119. i++
  120. if len(doc.Fields) > 0 {
  121. fields := make(map[string]interface{}, len(doc.Fields))
  122. for fieldName, fieldValue := range doc.Fields {
  123. fields[fieldName] = fieldValue
  124. }
  125. sources, err := json.Marshal(fields)
  126. if err != nil {
  127. return Response{}, err
  128. }
  129. bulkData[i] = sources
  130. i++
  131. }
  132. }
  133. // forces an extra trailing \n absolutely necessary for elasticsearch
  134. bulkData[len(bulkData)-1] = []byte(nil)
  135. r := Request{
  136. Conn: c,
  137. IndexList: []string{index},
  138. method: "POST",
  139. api: "_bulk",
  140. bulkData: bytes.Join(bulkData, []byte("\n")),
  141. }
  142. return r.Run()
  143. }
  144. // Search executes a search query against an index
  145. func (c *Connection) Search(query map[string]interface{}, indexList []string, typeList []string, extraArgs url.Values) (Response, error) {
  146. r := Request{
  147. Conn: c,
  148. Query: query,
  149. IndexList: indexList,
  150. TypeList: typeList,
  151. method: "POST",
  152. api: "_search",
  153. ExtraArgs: extraArgs,
  154. }
  155. return r.Run()
  156. }
  157. // Scan starts scroll over an index
  158. func (c *Connection) Scan(query map[string]interface{}, indexList []string, typeList []string, timeout string, size int) (Response, error) {
  159. v := url.Values{}
  160. v.Add("search_type", "scan")
  161. v.Add("scroll", timeout)
  162. v.Add("size", strconv.Itoa(size))
  163. r := Request{
  164. Conn: c,
  165. Query: query,
  166. IndexList: indexList,
  167. TypeList: typeList,
  168. method: "POST",
  169. api: "_search",
  170. ExtraArgs: v,
  171. }
  172. return r.Run()
  173. }
  174. // Scroll fetches data by scroll id
  175. func (c *Connection) Scroll(scrollId string, timeout string) (Response, error) {
  176. v := url.Values{}
  177. v.Add("scroll", timeout)
  178. r := Request{
  179. Conn: c,
  180. method: "POST",
  181. api: "_search/scroll",
  182. ExtraArgs: v,
  183. Body: []byte(scrollId),
  184. }
  185. return r.Run()
  186. }
  187. // Get a typed document by its id
  188. func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) {
  189. r := Request{
  190. Conn: c,
  191. IndexList: []string{index},
  192. method: "GET",
  193. api: documentType + "/" + id,
  194. ExtraArgs: extraArgs,
  195. }
  196. return r.Run()
  197. }
  198. // Index indexes a Document
  199. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  200. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  201. func (c *Connection) Index(d Document, extraArgs url.Values) (Response, error) {
  202. r := Request{
  203. Conn: c,
  204. Query: d.Fields,
  205. IndexList: []string{d.Index.(string)},
  206. TypeList: []string{d.Type},
  207. ExtraArgs: extraArgs,
  208. method: "POST",
  209. }
  210. if d.Id != nil {
  211. r.method = "PUT"
  212. r.id = d.Id.(string)
  213. }
  214. return r.Run()
  215. }
  216. // Delete deletes a Document d
  217. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  218. // URL arguments, for example, to control routing.
  219. func (c *Connection) Delete(d Document, extraArgs url.Values) (Response, error) {
  220. r := Request{
  221. Conn: c,
  222. IndexList: []string{d.Index.(string)},
  223. TypeList: []string{d.Type},
  224. ExtraArgs: extraArgs,
  225. method: "DELETE",
  226. id: d.Id.(string),
  227. }
  228. return r.Run()
  229. }
  230. // Run executes an elasticsearch Request. It converts data to Json, sends the
  231. // request and return the Response obtained
  232. func (req *Request) Run() (Response, error) {
  233. postData := []byte{}
  234. // XXX : refactor this
  235. if len(req.Body) > 0 {
  236. postData = req.Body
  237. } else if req.api == "_bulk" {
  238. postData = req.bulkData
  239. } else {
  240. b, err := json.Marshal(req.Query)
  241. if err != nil {
  242. return Response{}, err
  243. }
  244. postData = b
  245. }
  246. reader := bytes.NewReader(postData)
  247. newReq, err := http.NewRequest(req.method, req.Url(), reader)
  248. if err != nil {
  249. return Response{}, err
  250. }
  251. if req.method == "POST" || req.method == "PUT" {
  252. newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  253. }
  254. resp, err := req.Conn.Client.Do(newReq)
  255. if err != nil {
  256. return Response{}, err
  257. }
  258. defer resp.Body.Close()
  259. body, err := ioutil.ReadAll(resp.Body)
  260. if err != nil {
  261. return Response{}, err
  262. }
  263. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  264. return Response{}, errors.New(string(body))
  265. }
  266. esResp := new(Response)
  267. err = json.Unmarshal(body, &esResp)
  268. if err != nil {
  269. return Response{}, err
  270. }
  271. if esResp.Error != "" {
  272. return Response{}, &SearchError{esResp.Error, esResp.Status}
  273. }
  274. return *esResp, nil
  275. }
  276. // Url builds a Request for a URL
  277. func (r *Request) Url() string {
  278. path := "/" + strings.Join(r.IndexList, ",")
  279. if len(r.TypeList) > 0 {
  280. path += "/" + strings.Join(r.TypeList, ",")
  281. }
  282. // XXX : for indexing documents using the normal (non bulk) API
  283. if len(r.api) == 0 && len(r.id) > 0 {
  284. path += "/" + r.id
  285. }
  286. path += "/" + r.api
  287. u := url.URL{
  288. Scheme: "http",
  289. Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port),
  290. Path: path,
  291. RawQuery: r.ExtraArgs.Encode(),
  292. }
  293. return u.String()
  294. }
  295. // Buckets returns list of buckets in aggregation
  296. func (a Aggregation) Buckets() []Bucket {
  297. result := []Bucket{}
  298. if buckets, ok := a["buckets"]; ok {
  299. for _, bucket := range buckets.([]interface {}) {
  300. result = append(result, bucket.(map[string]interface{}))
  301. }
  302. }
  303. return result
  304. }
  305. // Key returns key for aggregation bucket
  306. func (b Bucket) Key() interface{} {
  307. return b["key"]
  308. }
  309. // DocCount returns count of documents in this bucket
  310. func (b Bucket) DocCount() uint64 {
  311. return uint64(b["doc_count"].(float64))
  312. }
  313. // Aggregation returns aggregation by name from bucket
  314. func (b Bucket) Aggregation(name string) Aggregation{
  315. if agg, ok := b[name]; ok {
  316. return agg.(map[string]interface{})
  317. } else {
  318. return Aggregation{}
  319. }
  320. }