You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

486 lines
11 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "reflect"
  15. "strconv"
  16. "strings"
  17. )
  18. const (
  19. BULK_COMMAND_INDEX = "index"
  20. BULK_COMMAND_DELETE = "delete"
  21. )
  22. func (err *SearchError) Error() string {
  23. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  24. }
  25. // NewConnection initiates a new Connection to an elasticsearch server
  26. //
  27. // This function is pretty useless for now but might be useful in a near future
  28. // if wee need more features like connection pooling or load balancing.
  29. func NewConnection(host string, port string) *Connection {
  30. return &Connection{host, port, http.DefaultClient}
  31. }
  32. func (c *Connection) WithClient(cl *http.Client) *Connection {
  33. c.Client = cl
  34. return c
  35. }
  36. // CreateIndex creates a new index represented by a name and a mapping
  37. func (c *Connection) CreateIndex(name string, mapping map[string]interface{}) (Response, error) {
  38. r := Request{
  39. Conn: c,
  40. Query: mapping,
  41. IndexList: []string{name},
  42. method: "PUT",
  43. }
  44. return r.Run()
  45. }
  46. // DeleteIndex deletes an index represented by a name
  47. func (c *Connection) DeleteIndex(name string) (Response, error) {
  48. r := Request{
  49. Conn: c,
  50. IndexList: []string{name},
  51. method: "DELETE",
  52. }
  53. return r.Run()
  54. }
  55. // RefreshIndex refreshes an index represented by a name
  56. func (c *Connection) RefreshIndex(name string) (Response, error) {
  57. r := Request{
  58. Conn: c,
  59. IndexList: []string{name},
  60. method: "POST",
  61. api: "_refresh",
  62. }
  63. return r.Run()
  64. }
  65. // Stats fetches statistics (_stats) for the current elasticsearch server
  66. func (c *Connection) Stats(indexList []string, extraArgs url.Values) (Response, error) {
  67. r := Request{
  68. Conn: c,
  69. IndexList: indexList,
  70. ExtraArgs: extraArgs,
  71. method: "GET",
  72. api: "_stats",
  73. }
  74. return r.Run()
  75. }
  76. // IndexStatus fetches the status (_status) for the indices defined in
  77. // indexList. Use _all in indexList to get stats for all indices
  78. func (c *Connection) IndexStatus(indexList []string) (Response, error) {
  79. r := Request{
  80. Conn: c,
  81. IndexList: indexList,
  82. method: "GET",
  83. api: "_status",
  84. }
  85. return r.Run()
  86. }
  87. // Bulk adds multiple documents in bulk mode
  88. func (c *Connection) BulkSend(documents []Document) (Response, error) {
  89. // We do not generate a traditional JSON here (often a one liner)
  90. // Elasticsearch expects one line of JSON per line (EOL = \n)
  91. // plus an extra \n at the very end of the document
  92. //
  93. // More informations about the Bulk JSON format for Elasticsearch:
  94. //
  95. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  96. //
  97. // This is quite annoying for us as we can not use the simple JSON
  98. // Marshaler available in Run().
  99. //
  100. // We have to generate this special JSON by ourselves which leads to
  101. // the code below.
  102. //
  103. // I know it is unreadable I must find an elegant way to fix this.
  104. // len(documents) * 2 : action + optional_sources
  105. // + 1 : room for the trailing \n
  106. bulkData := make([][]byte, len(documents)*2+1)
  107. i := 0
  108. for _, doc := range documents {
  109. action, err := json.Marshal(map[string]interface{}{
  110. doc.BulkCommand: map[string]interface{}{
  111. "_index": doc.Index,
  112. "_type": doc.Type,
  113. "_id": doc.Id,
  114. },
  115. })
  116. if err != nil {
  117. return Response{}, err
  118. }
  119. bulkData[i] = action
  120. i++
  121. if doc.Fields != nil {
  122. if docFields, ok := doc.Fields.(map[string]interface{}); ok {
  123. if len(docFields) == 0 {
  124. continue
  125. }
  126. } else {
  127. typeOfFields := reflect.TypeOf(doc.Fields)
  128. if typeOfFields.Kind() == reflect.Ptr {
  129. typeOfFields = typeOfFields.Elem()
  130. }
  131. if typeOfFields.Kind() != reflect.Struct {
  132. return Response{}, fmt.Errorf("Document fields not in struct or map[string]interface{} format")
  133. }
  134. if typeOfFields.NumField() == 0 {
  135. continue
  136. }
  137. }
  138. sources, err := json.Marshal(doc.Fields)
  139. if err != nil {
  140. return Response{}, err
  141. }
  142. bulkData[i] = sources
  143. i++
  144. }
  145. }
  146. // forces an extra trailing \n absolutely necessary for elasticsearch
  147. bulkData[len(bulkData)-1] = []byte(nil)
  148. r := Request{
  149. Conn: c,
  150. method: "POST",
  151. api: "_bulk",
  152. bulkData: bytes.Join(bulkData, []byte("\n")),
  153. }
  154. return r.Run()
  155. }
  156. // Search executes a search query against an index
  157. func (c *Connection) Search(query map[string]interface{}, indexList []string, typeList []string, extraArgs url.Values) (Response, error) {
  158. r := Request{
  159. Conn: c,
  160. Query: query,
  161. IndexList: indexList,
  162. TypeList: typeList,
  163. method: "POST",
  164. api: "_search",
  165. ExtraArgs: extraArgs,
  166. }
  167. return r.Run()
  168. }
  169. //Query runs a query against an index using the provided http method.
  170. //This method can be used to execute a delete by query, just pass in "DELETE"
  171. //for the HTTP method.
  172. func (c *Connection) Query(query map[string]interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (Response, error) {
  173. r := Request{
  174. Conn: c,
  175. Query: query,
  176. IndexList: indexList,
  177. TypeList: typeList,
  178. method: httpMethod,
  179. api: "_query",
  180. ExtraArgs: extraArgs,
  181. }
  182. return r.Run()
  183. }
  184. // Scan starts scroll over an index
  185. func (c *Connection) Scan(query map[string]interface{}, indexList []string, typeList []string, timeout string, size int) (Response, error) {
  186. v := url.Values{}
  187. v.Add("search_type", "scan")
  188. v.Add("scroll", timeout)
  189. v.Add("size", strconv.Itoa(size))
  190. r := Request{
  191. Conn: c,
  192. Query: query,
  193. IndexList: indexList,
  194. TypeList: typeList,
  195. method: "POST",
  196. api: "_search",
  197. ExtraArgs: v,
  198. }
  199. return r.Run()
  200. }
  201. // Scroll fetches data by scroll id
  202. func (c *Connection) Scroll(scrollId string, timeout string) (Response, error) {
  203. v := url.Values{}
  204. v.Add("scroll", timeout)
  205. r := Request{
  206. Conn: c,
  207. method: "POST",
  208. api: "_search/scroll",
  209. ExtraArgs: v,
  210. Body: []byte(scrollId),
  211. }
  212. return r.Run()
  213. }
  214. // Get a typed document by its id
  215. func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) {
  216. r := Request{
  217. Conn: c,
  218. IndexList: []string{index},
  219. method: "GET",
  220. api: documentType + "/" + id,
  221. ExtraArgs: extraArgs,
  222. }
  223. return r.Run()
  224. }
  225. // Index indexes a Document
  226. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  227. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  228. func (c *Connection) Index(d Document, extraArgs url.Values) (Response, error) {
  229. r := Request{
  230. Conn: c,
  231. Query: d.Fields,
  232. IndexList: []string{d.Index.(string)},
  233. TypeList: []string{d.Type},
  234. ExtraArgs: extraArgs,
  235. method: "POST",
  236. }
  237. if d.Id != nil {
  238. r.method = "PUT"
  239. r.id = d.Id.(string)
  240. }
  241. return r.Run()
  242. }
  243. // Delete deletes a Document d
  244. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  245. // URL arguments, for example, to control routing.
  246. func (c *Connection) Delete(d Document, extraArgs url.Values) (Response, error) {
  247. r := Request{
  248. Conn: c,
  249. IndexList: []string{d.Index.(string)},
  250. TypeList: []string{d.Type},
  251. ExtraArgs: extraArgs,
  252. method: "DELETE",
  253. id: d.Id.(string),
  254. }
  255. return r.Run()
  256. }
  257. // Run executes an elasticsearch Request. It converts data to Json, sends the
  258. // request and return the Response obtained
  259. func (req *Request) Run() (Response, error) {
  260. postData := []byte{}
  261. // XXX : refactor this
  262. if len(req.Body) > 0 {
  263. postData = req.Body
  264. } else if req.api == "_bulk" {
  265. postData = req.bulkData
  266. } else {
  267. b, err := json.Marshal(req.Query)
  268. if err != nil {
  269. return Response{}, err
  270. }
  271. postData = b
  272. }
  273. reader := bytes.NewReader(postData)
  274. newReq, err := http.NewRequest(req.method, req.Url(), reader)
  275. if err != nil {
  276. return Response{}, err
  277. }
  278. if req.method == "POST" || req.method == "PUT" {
  279. newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  280. }
  281. resp, err := req.Conn.Client.Do(newReq)
  282. if err != nil {
  283. return Response{}, err
  284. }
  285. defer resp.Body.Close()
  286. body, err := ioutil.ReadAll(resp.Body)
  287. if err != nil {
  288. return Response{}, err
  289. }
  290. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  291. return Response{}, errors.New(string(body))
  292. }
  293. esResp := new(Response)
  294. if req.method == "HEAD" {
  295. esResp.Status = uint64(resp.StatusCode)
  296. } else {
  297. err = json.Unmarshal(body, &esResp)
  298. if err != nil {
  299. return Response{}, err
  300. }
  301. json.Unmarshal(body, &esResp.Raw)
  302. }
  303. if req.api == "_bulk" && esResp.Errors {
  304. for _, item := range esResp.Items {
  305. for _, i := range item {
  306. if i.Error != "" {
  307. return Response{}, &SearchError{i.Error, i.Status}
  308. }
  309. }
  310. }
  311. return Response{}, &SearchError{Msg: "Unknown error while bulk indexing"}
  312. }
  313. if esResp.Error != "" {
  314. return Response{}, &SearchError{esResp.Error, esResp.Status}
  315. }
  316. return *esResp, nil
  317. }
  318. // Url builds a Request for a URL
  319. func (r *Request) Url() string {
  320. path := "/" + strings.Join(r.IndexList, ",")
  321. if len(r.TypeList) > 0 {
  322. path += "/" + strings.Join(r.TypeList, ",")
  323. }
  324. // XXX : for indexing documents using the normal (non bulk) API
  325. if len(r.id) > 0 {
  326. path += "/" + r.id
  327. }
  328. path += "/" + r.api
  329. u := url.URL{
  330. Scheme: "http",
  331. Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port),
  332. Path: path,
  333. RawQuery: r.ExtraArgs.Encode(),
  334. }
  335. return u.String()
  336. }
  337. // Buckets returns list of buckets in aggregation
  338. func (a Aggregation) Buckets() []Bucket {
  339. result := []Bucket{}
  340. if buckets, ok := a["buckets"]; ok {
  341. for _, bucket := range buckets.([]interface{}) {
  342. result = append(result, bucket.(map[string]interface{}))
  343. }
  344. }
  345. return result
  346. }
  347. // Key returns key for aggregation bucket
  348. func (b Bucket) Key() interface{} {
  349. return b["key"]
  350. }
  351. // DocCount returns count of documents in this bucket
  352. func (b Bucket) DocCount() uint64 {
  353. return uint64(b["doc_count"].(float64))
  354. }
  355. // Aggregation returns aggregation by name from bucket
  356. func (b Bucket) Aggregation(name string) Aggregation {
  357. if agg, ok := b[name]; ok {
  358. return agg.(map[string]interface{})
  359. } else {
  360. return Aggregation{}
  361. }
  362. }
  363. // PutMapping registers a specific mapping for one or more types in one or more indexes
  364. func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, indexes []string) (Response, error) {
  365. r := Request{
  366. Conn: c,
  367. Query: mapping,
  368. IndexList: indexes,
  369. method: "PUT",
  370. api: "_mappings/" + typeName,
  371. }
  372. return r.Run()
  373. }
  374. func (c *Connection) GetMapping(types []string, indexes []string) (Response, error) {
  375. r := Request{
  376. Conn: c,
  377. IndexList: indexes,
  378. method: "GET",
  379. api: "_mapping/" + strings.Join(types, ","),
  380. }
  381. return r.Run()
  382. }
  383. // IndicesExist checks whether index (or indices) exist on the server
  384. func (c *Connection) IndicesExist(indexes []string) (bool, error) {
  385. r := Request{
  386. Conn: c,
  387. IndexList: indexes,
  388. method: "HEAD",
  389. }
  390. resp, err := r.Run()
  391. return resp.Status == 200, err
  392. }
  393. func (c *Connection) Update(d Document, query map[string]interface{}, extraArgs url.Values) (Response, error) {
  394. r := Request{
  395. Conn: c,
  396. Query: query,
  397. IndexList: []string{d.Index.(string)},
  398. TypeList: []string{d.Type},
  399. ExtraArgs: extraArgs,
  400. method: "POST",
  401. api: "_update",
  402. id: d.Id.(string),
  403. }
  404. return r.Run()
  405. }