You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

383 lines
8.8 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "strconv"
  15. "strings"
  16. )
  17. const (
  18. BULK_COMMAND_INDEX = "index"
  19. BULK_COMMAND_DELETE = "delete"
  20. )
  21. func (err *SearchError) Error() string {
  22. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  23. }
  24. // NewConnection initiates a new Connection to an elasticsearch server
  25. //
  26. // This function is pretty useless for now but might be useful in a near future
  27. // if wee need more features like connection pooling or load balancing.
  28. func NewConnection(host string, port string) *Connection {
  29. return &Connection{host, port}
  30. }
  31. // CreateIndex creates a new index represented by a name and a mapping
  32. func (c *Connection) CreateIndex(name string, mapping map[string]interface{}) (Response, error) {
  33. r := Request{
  34. Conn: c,
  35. Query: mapping,
  36. IndexList: []string{name},
  37. method: "PUT",
  38. }
  39. return r.Run()
  40. }
  41. // DeleteIndex deletes an index represented by a name
  42. func (c *Connection) DeleteIndex(name string) (Response, error) {
  43. r := Request{
  44. Conn: c,
  45. IndexList: []string{name},
  46. method: "DELETE",
  47. }
  48. return r.Run()
  49. }
  50. // RefreshIndex refreshes an index represented by a name
  51. func (c *Connection) RefreshIndex(name string) (Response, error) {
  52. r := Request{
  53. Conn: c,
  54. IndexList: []string{name},
  55. method: "POST",
  56. api: "_refresh",
  57. }
  58. return r.Run()
  59. }
  60. // Stats fetches statistics (_stats) for the current elasticsearch server
  61. func (c *Connection) Stats(indexList []string, extraArgs url.Values) (Response, error) {
  62. r := Request{
  63. Conn: c,
  64. IndexList: indexList,
  65. ExtraArgs: extraArgs,
  66. method: "GET",
  67. api: "_stats",
  68. }
  69. return r.Run()
  70. }
  71. // IndexStatus fetches the status (_status) for the indices defined in
  72. // indexList. Use _all in indexList to get stats for all indices
  73. func (c *Connection) IndexStatus(indexList []string) (Response, error) {
  74. r := Request{
  75. Conn: c,
  76. IndexList: indexList,
  77. method: "GET",
  78. api: "_status",
  79. }
  80. return r.Run()
  81. }
  82. // Bulk adds multiple documents in bulk mode to the index for a given type
  83. func (c *Connection) BulkSend(index string, documents []Document) (Response, error) {
  84. // We do not generate a traditionnal JSON here (often a one liner)
  85. // Elasticsearch expects one line of JSON per line (EOL = \n)
  86. // plus an extra \n at the very end of the document
  87. //
  88. // More informations about the Bulk JSON format for Elasticsearch:
  89. //
  90. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  91. //
  92. // This is quite annoying for us as we can not use the simple JSON
  93. // Marshaler available in Run().
  94. //
  95. // We have to generate this special JSON by ourselves which leads to
  96. // the code below.
  97. //
  98. // I know it is unreadable I must find an elegant way to fix this.
  99. // len(documents) * 2 : action + optional_sources
  100. // + 1 : room for the trailing \n
  101. bulkData := make([][]byte, len(documents)*2+1)
  102. i := 0
  103. for _, doc := range documents {
  104. action, err := json.Marshal(map[string]interface{}{
  105. doc.BulkCommand: map[string]interface{}{
  106. "_index": doc.Index,
  107. "_type": doc.Type,
  108. "_id": doc.Id,
  109. },
  110. })
  111. if err != nil {
  112. return Response{}, err
  113. }
  114. bulkData[i] = action
  115. i++
  116. if len(doc.Fields) > 0 {
  117. fields := make(map[string]interface{}, len(doc.Fields))
  118. for fieldName, fieldValue := range doc.Fields {
  119. fields[fieldName] = fieldValue
  120. }
  121. sources, err := json.Marshal(fields)
  122. if err != nil {
  123. return Response{}, err
  124. }
  125. bulkData[i] = sources
  126. i++
  127. }
  128. }
  129. // forces an extra trailing \n absolutely necessary for elasticsearch
  130. bulkData[len(bulkData)-1] = []byte(nil)
  131. r := Request{
  132. Conn: c,
  133. IndexList: []string{index},
  134. method: "POST",
  135. api: "_bulk",
  136. bulkData: bytes.Join(bulkData, []byte("\n")),
  137. }
  138. return r.Run()
  139. }
  140. // Search executes a search query against an index
  141. func (c *Connection) Search(query map[string]interface{}, indexList []string, typeList []string, extraArgs url.Values) (Response, error) {
  142. r := Request{
  143. Conn: c,
  144. Query: query,
  145. IndexList: indexList,
  146. TypeList: typeList,
  147. method: "POST",
  148. api: "_search",
  149. ExtraArgs: extraArgs,
  150. }
  151. return r.Run()
  152. }
  153. // Scan starts scroll over an index
  154. func (c *Connection) Scan(query map[string]interface{}, indexList []string, typeList []string, timeout string, size int) (Response, error) {
  155. v := url.Values{}
  156. v.Add("search_type", "scan")
  157. v.Add("scroll", timeout)
  158. v.Add("size", strconv.Itoa(size))
  159. r := Request{
  160. Conn: c,
  161. Query: query,
  162. IndexList: indexList,
  163. TypeList: typeList,
  164. method: "POST",
  165. api: "_search",
  166. ExtraArgs: v,
  167. }
  168. return r.Run()
  169. }
  170. // Scroll fetches data by scroll id
  171. func (c *Connection) Scroll(scrollId string, timeout string) (Response, error) {
  172. v := url.Values{}
  173. v.Add("scroll", timeout)
  174. r := Request{
  175. Conn: c,
  176. method: "POST",
  177. api: "_search/scroll",
  178. ExtraArgs: v,
  179. Body: []byte(scrollId),
  180. }
  181. return r.Run()
  182. }
  183. // Get a typed document by its id
  184. func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) {
  185. r := Request{
  186. Conn: c,
  187. IndexList: []string{index},
  188. method: "GET",
  189. api: documentType + "/" + id,
  190. ExtraArgs: extraArgs,
  191. }
  192. return r.Run()
  193. }
  194. // Index indexes a Document
  195. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  196. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  197. func (c *Connection) Index(d Document, extraArgs url.Values) (Response, error) {
  198. r := Request{
  199. Conn: c,
  200. Query: d.Fields,
  201. IndexList: []string{d.Index.(string)},
  202. TypeList: []string{d.Type},
  203. ExtraArgs: extraArgs,
  204. method: "POST",
  205. }
  206. if d.Id != nil {
  207. r.method = "PUT"
  208. r.id = d.Id.(string)
  209. }
  210. return r.Run()
  211. }
  212. // Delete deletes a Document d
  213. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  214. // URL arguments, for example, to control routing.
  215. func (c *Connection) Delete(d Document, extraArgs url.Values) (Response, error) {
  216. r := Request{
  217. Conn: c,
  218. IndexList: []string{d.Index.(string)},
  219. TypeList: []string{d.Type},
  220. ExtraArgs: extraArgs,
  221. method: "DELETE",
  222. id: d.Id.(string),
  223. }
  224. return r.Run()
  225. }
  226. // Run executes an elasticsearch Request. It converts data to Json, sends the
  227. // request and return the Response obtained
  228. func (req *Request) Run() (Response, error) {
  229. postData := []byte{}
  230. // XXX : refactor this
  231. if len(req.Body) > 0 {
  232. postData = req.Body
  233. } else if req.api == "_bulk" {
  234. postData = req.bulkData
  235. } else {
  236. b, err := json.Marshal(req.Query)
  237. if err != nil {
  238. return Response{}, err
  239. }
  240. postData = b
  241. }
  242. reader := bytes.NewReader(postData)
  243. client := http.DefaultClient
  244. newReq, err := http.NewRequest(req.method, req.Url(), reader)
  245. if err != nil {
  246. return Response{}, err
  247. }
  248. if req.method == "POST" || req.method == "PUT" {
  249. newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  250. }
  251. resp, err := client.Do(newReq)
  252. if err != nil {
  253. return Response{}, err
  254. }
  255. defer resp.Body.Close()
  256. body, err := ioutil.ReadAll(resp.Body)
  257. if err != nil {
  258. return Response{}, err
  259. }
  260. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  261. return Response{}, errors.New(string(body))
  262. }
  263. esResp := new(Response)
  264. err = json.Unmarshal(body, &esResp)
  265. if err != nil {
  266. return Response{}, err
  267. }
  268. if esResp.Error != "" {
  269. return Response{}, &SearchError{esResp.Error, esResp.Status}
  270. }
  271. return *esResp, nil
  272. }
  273. // Url builds a Request for a URL
  274. func (r *Request) Url() string {
  275. path := "/" + strings.Join(r.IndexList, ",")
  276. if len(r.TypeList) > 0 {
  277. path += "/" + strings.Join(r.TypeList, ",")
  278. }
  279. // XXX : for indexing documents using the normal (non bulk) API
  280. if len(r.api) == 0 && len(r.id) > 0 {
  281. path += "/" + r.id
  282. }
  283. path += "/" + r.api
  284. u := url.URL{
  285. Scheme: "http",
  286. Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port),
  287. Path: path,
  288. RawQuery: r.ExtraArgs.Encode(),
  289. }
  290. return u.String()
  291. }
  292. // Buckets returns list of buckets in aggregation
  293. func (a Aggregation) Buckets() []Bucket {
  294. result := []Bucket{}
  295. if buckets, ok := a["buckets"]; ok {
  296. for _, bucket := range buckets.([]interface {}) {
  297. result = append(result, bucket.(map[string]interface{}))
  298. }
  299. }
  300. return result
  301. }
  302. // Key returns key for aggregation bucket
  303. func (b Bucket) Key() interface{} {
  304. return b["key"]
  305. }
  306. // DocCount returns count of documents in this bucket
  307. func (b Bucket) DocCount() uint64 {
  308. return uint64(b["doc_count"].(float64))
  309. }
  310. // Aggregation returns aggregation by name from bucket
  311. func (b Bucket) Aggregation(name string) Aggregation{
  312. if agg, ok := b[name]; ok {
  313. return agg.(map[string]interface{})
  314. } else {
  315. return Aggregation{}
  316. }
  317. }