You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

351 lines
8.0 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "strconv"
  15. "strings"
  16. )
  17. const (
  18. BULK_COMMAND_INDEX = "index"
  19. BULK_COMMAND_DELETE = "delete"
  20. )
  21. func (err *SearchError) Error() string {
  22. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  23. }
  24. // NewConnection initiates a new Connection to an elasticsearch server
  25. //
  26. // This function is pretty useless for now but might be useful in a near future
  27. // if wee need more features like connection pooling or load balancing.
  28. func NewConnection(host string, port string) *Connection {
  29. return &Connection{host, port}
  30. }
  31. // CreateIndex creates a new index represented by a name and a mapping
  32. func (c *Connection) CreateIndex(name string, mapping map[string]interface{}) (Response, error) {
  33. r := Request{
  34. Conn: c,
  35. Query: mapping,
  36. IndexList: []string{name},
  37. method: "PUT",
  38. }
  39. return r.Run()
  40. }
  41. // DeleteIndex deletes an index represented by a name
  42. func (c *Connection) DeleteIndex(name string) (Response, error) {
  43. r := Request{
  44. Conn: c,
  45. IndexList: []string{name},
  46. method: "DELETE",
  47. }
  48. return r.Run()
  49. }
  50. // RefreshIndex refreshes an index represented by a name
  51. func (c *Connection) RefreshIndex(name string) (Response, error) {
  52. r := Request{
  53. Conn: c,
  54. IndexList: []string{name},
  55. method: "POST",
  56. api: "_refresh",
  57. }
  58. return r.Run()
  59. }
  60. // Stats fetches statistics (_stats) for the current elasticsearch server
  61. func (c *Connection) Stats(indexList []string, extraArgs url.Values) (Response, error) {
  62. r := Request{
  63. Conn: c,
  64. IndexList: indexList,
  65. ExtraArgs: extraArgs,
  66. method: "GET",
  67. api: "_stats",
  68. }
  69. return r.Run()
  70. }
  71. // IndexStatus fetches the status (_status) for the indices defined in
  72. // indexList. Use _all in indexList to get stats for all indices
  73. func (c *Connection) IndexStatus(indexList []string) (Response, error) {
  74. r := Request{
  75. Conn: c,
  76. IndexList: indexList,
  77. method: "GET",
  78. api: "_status",
  79. }
  80. return r.Run()
  81. }
  82. // Bulk adds multiple documents in bulk mode to the index for a given type
  83. func (c *Connection) BulkSend(index string, documents []Document) (Response, error) {
  84. // We do not generate a traditionnal JSON here (often a one liner)
  85. // Elasticsearch expects one line of JSON per line (EOL = \n)
  86. // plus an extra \n at the very end of the document
  87. //
  88. // More informations about the Bulk JSON format for Elasticsearch:
  89. //
  90. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  91. //
  92. // This is quite annoying for us as we can not use the simple JSON
  93. // Marshaler available in Run().
  94. //
  95. // We have to generate this special JSON by ourselves which leads to
  96. // the code below.
  97. //
  98. // I know it is unreadable I must find an elegant way to fix this.
  99. // len(documents) * 2 : action + optional_sources
  100. // + 1 : room for the trailing \n
  101. bulkData := make([][]byte, len(documents)*2+1)
  102. i := 0
  103. for _, doc := range documents {
  104. action, err := json.Marshal(map[string]interface{}{
  105. doc.BulkCommand: map[string]interface{}{
  106. "_index": doc.Index,
  107. "_type": doc.Type,
  108. "_id": doc.Id,
  109. },
  110. })
  111. if err != nil {
  112. return Response{}, err
  113. }
  114. bulkData[i] = action
  115. i++
  116. if len(doc.Fields) > 0 {
  117. fields := make(map[string]interface{}, len(doc.Fields))
  118. for fieldName, fieldValue := range doc.Fields {
  119. fields[fieldName] = fieldValue
  120. }
  121. sources, err := json.Marshal(fields)
  122. if err != nil {
  123. return Response{}, err
  124. }
  125. bulkData[i] = sources
  126. i++
  127. }
  128. }
  129. // forces an extra trailing \n absolutely necessary for elasticsearch
  130. bulkData[len(bulkData)-1] = []byte(nil)
  131. r := Request{
  132. Conn: c,
  133. IndexList: []string{index},
  134. method: "POST",
  135. api: "_bulk",
  136. bulkData: bytes.Join(bulkData, []byte("\n")),
  137. }
  138. return r.Run()
  139. }
  140. // Search executes a search query against an index
  141. func (c *Connection) Search(query map[string]interface{}, indexList []string, typeList []string) (Response, error) {
  142. r := Request{
  143. Conn: c,
  144. Query: query,
  145. IndexList: indexList,
  146. TypeList: typeList,
  147. method: "POST",
  148. api: "_search",
  149. }
  150. return r.Run()
  151. }
  152. // Scan starts scroll over an index
  153. func (c *Connection) Scan(query map[string]interface{}, indexList []string, typeList []string, timeout string, size int) (Response, error) {
  154. v := url.Values{}
  155. v.Add("search_type", "scan")
  156. v.Add("scroll", timeout)
  157. v.Add("size", strconv.Itoa(size))
  158. r := Request{
  159. Conn: c,
  160. Query: query,
  161. IndexList: indexList,
  162. TypeList: typeList,
  163. method: "POST",
  164. api: "_search",
  165. ExtraArgs: v,
  166. }
  167. return r.Run()
  168. }
  169. // Scroll fetches data by scroll id
  170. func (c *Connection) Scroll(scrollId string, timeout string) (Response, error) {
  171. v := url.Values{}
  172. v.Add("scroll", timeout)
  173. r := Request{
  174. Conn: c,
  175. method: "POST",
  176. api: "_search/scroll",
  177. ExtraArgs: v,
  178. Body: []byte(scrollId),
  179. }
  180. return r.Run()
  181. }
  182. // Get a typed document by its id
  183. func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) {
  184. r := Request{
  185. Conn: c,
  186. IndexList: []string{index},
  187. method: "GET",
  188. api: documentType + "/" + id,
  189. ExtraArgs: extraArgs,
  190. }
  191. return r.Run()
  192. }
  193. // Index indexes a Document
  194. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  195. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  196. func (c *Connection) Index(d Document, extraArgs url.Values) (Response, error) {
  197. r := Request{
  198. Conn: c,
  199. Query: d.Fields,
  200. IndexList: []string{d.Index.(string)},
  201. TypeList: []string{d.Type},
  202. ExtraArgs: extraArgs,
  203. method: "POST",
  204. }
  205. if d.Id != nil {
  206. r.method = "PUT"
  207. r.id = d.Id.(string)
  208. }
  209. return r.Run()
  210. }
  211. // Delete deletes a Document d
  212. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  213. // URL arguments, for example, to control routing.
  214. func (c *Connection) Delete(d Document, extraArgs url.Values) (Response, error) {
  215. r := Request{
  216. Conn: c,
  217. IndexList: []string{d.Index.(string)},
  218. TypeList: []string{d.Type},
  219. ExtraArgs: extraArgs,
  220. method: "DELETE",
  221. id: d.Id.(string),
  222. }
  223. return r.Run()
  224. }
  225. // Run executes an elasticsearch Request. It converts data to Json, sends the
  226. // request and return the Response obtained
  227. func (req *Request) Run() (Response, error) {
  228. postData := []byte{}
  229. // XXX : refactor this
  230. if len(req.Body) > 0 {
  231. postData = req.Body
  232. } else if req.api == "_bulk" {
  233. postData = req.bulkData
  234. } else {
  235. b, err := json.Marshal(req.Query)
  236. if err != nil {
  237. return Response{}, err
  238. }
  239. postData = b
  240. }
  241. reader := bytes.NewReader(postData)
  242. client := http.DefaultClient
  243. newReq, err := http.NewRequest(req.method, req.Url(), reader)
  244. if err != nil {
  245. return Response{}, err
  246. }
  247. if req.method == "POST" || req.method == "PUT" {
  248. newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  249. }
  250. resp, err := client.Do(newReq)
  251. if err != nil {
  252. return Response{}, err
  253. }
  254. defer resp.Body.Close()
  255. body, err := ioutil.ReadAll(resp.Body)
  256. if err != nil {
  257. return Response{}, err
  258. }
  259. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  260. return Response{}, errors.New(string(body))
  261. }
  262. esResp := new(Response)
  263. err = json.Unmarshal(body, &esResp)
  264. if err != nil {
  265. return Response{}, err
  266. }
  267. if esResp.Error != "" {
  268. return Response{}, &SearchError{esResp.Error, esResp.Status}
  269. }
  270. return *esResp, nil
  271. }
  272. // Url builds a Request for a URL
  273. func (r *Request) Url() string {
  274. path := "/" + strings.Join(r.IndexList, ",")
  275. if len(r.TypeList) > 0 {
  276. path += "/" + strings.Join(r.TypeList, ",")
  277. }
  278. // XXX : for indexing documents using the normal (non bulk) API
  279. if len(r.api) == 0 && len(r.id) > 0 {
  280. path += "/" + r.id
  281. }
  282. path += "/" + r.api
  283. u := url.URL{
  284. Scheme: "http",
  285. Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port),
  286. Path: path,
  287. RawQuery: r.ExtraArgs.Encode(),
  288. }
  289. return u.String()
  290. }