You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

290 lines
6.5 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "strings"
  15. )
  16. const (
  17. BULK_COMMAND_INDEX = "index"
  18. BULK_COMMAND_DELETE = "delete"
  19. )
  20. func (err *SearchError) Error() string {
  21. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  22. }
  23. // NewConnection initiates a new Connection to an elasticsearch server
  24. //
  25. // This function is pretty useless for now but might be useful in a near future
  26. // if wee need more features like connection pooling or load balancing.
  27. func NewConnection(host string, port string) *Connection {
  28. return &Connection{host, port}
  29. }
  30. // CreateIndex creates a new index represented by a name and a mapping
  31. func (c *Connection) CreateIndex(name string, mapping map[string]interface{}) (Response, error) {
  32. r := Request{
  33. Conn: c,
  34. Query: mapping,
  35. IndexList: []string{name},
  36. method: "PUT",
  37. }
  38. return r.Run()
  39. }
  40. // DeleteIndex deletes an index represented by a name
  41. func (c *Connection) DeleteIndex(name string) (Response, error) {
  42. r := Request{
  43. Conn: c,
  44. IndexList: []string{name},
  45. method: "DELETE",
  46. }
  47. return r.Run()
  48. }
  49. // RefreshIndex refreshes an index represented by a name
  50. func (c *Connection) RefreshIndex(name string) (Response, error) {
  51. r := Request{
  52. Conn: c,
  53. IndexList: []string{name},
  54. method: "POST",
  55. api: "_refresh",
  56. }
  57. return r.Run()
  58. }
  59. func (c *Connection) FetchStats() (Response, error) {
  60. r := Request{
  61. Conn: c,
  62. method: "GET",
  63. api: "_stats",
  64. }
  65. return r.Run()
  66. }
  67. // Bulk adds multiple documents in bulk mode to the index for a given type
  68. func (c *Connection) BulkSend(index string, documents []Document) (Response, error) {
  69. // We do not generate a traditionnal JSON here (often a one liner)
  70. // Elasticsearch expects one line of JSON per line (EOL = \n)
  71. // plus an extra \n at the very end of the document
  72. //
  73. // More informations about the Bulk JSON format for Elasticsearch:
  74. //
  75. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  76. //
  77. // This is quite annoying for us as we can not use the simple JSON
  78. // Marshaler available in Run().
  79. //
  80. // We have to generate this special JSON by ourselves which leads to
  81. // the code below.
  82. //
  83. // I know it is unreadable I must find an elegant way to fix this.
  84. bulkData := []byte{}
  85. for _, doc := range documents {
  86. header := map[string]interface{}{
  87. doc.BulkCommand: map[string]interface{}{
  88. "_index": doc.Index,
  89. "_type": doc.Type,
  90. "_id": doc.Id,
  91. },
  92. }
  93. temp, err := json.Marshal(header)
  94. if err != nil {
  95. return Response{}, err
  96. }
  97. temp = append(temp, '\n')
  98. bulkData = append(bulkData, temp[:]...)
  99. if len(doc.Fields) > 0 {
  100. fields := map[string]interface{}{}
  101. for fieldName, fieldValue := range doc.Fields {
  102. fields[fieldName] = fieldValue
  103. }
  104. temp, err = json.Marshal(fields)
  105. if err != nil {
  106. return Response{}, err
  107. }
  108. temp = append(temp, '\n')
  109. bulkData = append(bulkData, temp[:]...)
  110. }
  111. }
  112. r := Request{
  113. Conn: c,
  114. IndexList: []string{index},
  115. method: "POST",
  116. api: "_bulk",
  117. bulkData: bulkData,
  118. }
  119. return r.Run()
  120. }
  121. // Search executes a search query against an index
  122. func (c *Connection) Search(query map[string]interface{}, indexList []string, typeList []string) (Response, error) {
  123. r := Request{
  124. Conn: c,
  125. Query: query,
  126. IndexList: indexList,
  127. TypeList: typeList,
  128. method: "POST",
  129. api: "_search",
  130. }
  131. return r.Run()
  132. }
  133. // Get a typed document by its id
  134. func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) {
  135. r := Request{
  136. Conn: c,
  137. IndexList: []string{index},
  138. method: "GET",
  139. api: documentType + "/" + id,
  140. ExtraArgs: extraArgs,
  141. }
  142. return r.Run()
  143. }
  144. // Index indexes a Document
  145. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  146. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  147. func (c *Connection) Index(d Document, extraArgs url.Values) (Response, error) {
  148. r := Request{
  149. Conn: c,
  150. Query: d.Fields,
  151. IndexList: []string{d.Index.(string)},
  152. TypeList: []string{d.Type},
  153. ExtraArgs: extraArgs,
  154. method: "POST",
  155. }
  156. if d.Id != nil {
  157. r.method = "PUT"
  158. r.id = d.Id.(string)
  159. }
  160. return r.Run()
  161. }
  162. // Delete deletes a Document d
  163. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  164. // URL arguments, for example, to control routing.
  165. func (c *Connection) Delete(d Document, extraArgs url.Values) (Response, error) {
  166. r := Request{
  167. Conn: c,
  168. IndexList: []string{d.Index.(string)},
  169. TypeList: []string{d.Type},
  170. ExtraArgs: extraArgs,
  171. method: "DELETE",
  172. id: d.Id.(string),
  173. }
  174. return r.Run()
  175. }
  176. // Run executes an elasticsearch Request. It converts data to Json, sends the
  177. // request and return the Response obtained
  178. func (req *Request) Run() (Response, error) {
  179. postData := []byte{}
  180. // XXX : refactor this
  181. if req.api == "_bulk" {
  182. postData = req.bulkData
  183. } else {
  184. b, err := json.Marshal(req.Query)
  185. if err != nil {
  186. return Response{}, err
  187. }
  188. postData = b
  189. }
  190. reader := bytes.NewReader(postData)
  191. client := http.DefaultClient
  192. newReq, err := http.NewRequest(req.method, req.Url(), reader)
  193. if err != nil {
  194. return Response{}, err
  195. }
  196. if req.method == "POST" || req.method == "PUT" {
  197. newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  198. }
  199. resp, err := client.Do(newReq)
  200. if err != nil {
  201. return Response{}, err
  202. }
  203. defer resp.Body.Close()
  204. body, err := ioutil.ReadAll(resp.Body)
  205. if err != nil {
  206. return Response{}, err
  207. }
  208. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  209. return Response{}, errors.New(string(body))
  210. }
  211. esResp := new(Response)
  212. err = json.Unmarshal(body, &esResp)
  213. if err != nil {
  214. return Response{}, err
  215. }
  216. if esResp.Error != "" {
  217. return Response{}, &SearchError{esResp.Error, esResp.Status}
  218. }
  219. return *esResp, nil
  220. }
  221. // Url builds a Request for a URL
  222. func (r *Request) Url() string {
  223. path := "/" + strings.Join(r.IndexList, ",")
  224. if len(r.TypeList) > 0 {
  225. path += "/" + strings.Join(r.TypeList, ",")
  226. }
  227. // XXX : for indexing documents using the normal (non bulk) API
  228. if len(r.api) == 0 && len(r.id) > 0 {
  229. path += "/" + r.id
  230. }
  231. path += "/" + r.api
  232. u := url.URL{
  233. Scheme: "http",
  234. Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port),
  235. Path: path,
  236. RawQuery: r.ExtraArgs.Encode(),
  237. }
  238. return u.String()
  239. }