You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

goes.go 13 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
9 years ago
9 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package goes provides an API to access Elasticsearch.
  5. package goes
  6. import (
  7. "bytes"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "reflect"
  15. "strconv"
  16. "strings"
  17. )
  18. const (
  19. BULK_COMMAND_INDEX = "index"
  20. BULK_COMMAND_DELETE = "delete"
  21. )
  22. func (err *SearchError) Error() string {
  23. return fmt.Sprintf("[%d] %s", err.StatusCode, err.Msg)
  24. }
  25. // NewConnection initiates a new Connection to an elasticsearch server
  26. //
  27. // This function is pretty useless for now but might be useful in a near future
  28. // if wee need more features like connection pooling or load balancing.
  29. func NewConnection(host string, port string) *Connection {
  30. return &Connection{host, port, http.DefaultClient}
  31. }
  32. func (c *Connection) WithClient(cl *http.Client) *Connection {
  33. c.Client = cl
  34. return c
  35. }
  36. // CreateIndex creates a new index represented by a name and a mapping
  37. func (c *Connection) CreateIndex(name string, mapping map[string]interface{}) (Response, error) {
  38. r := Request{
  39. Conn: c,
  40. Query: mapping,
  41. IndexList: []string{name},
  42. method: "PUT",
  43. }
  44. return r.Run()
  45. }
  46. // DeleteIndex deletes an index represented by a name
  47. func (c *Connection) DeleteIndex(name string) (Response, error) {
  48. r := Request{
  49. Conn: c,
  50. IndexList: []string{name},
  51. method: "DELETE",
  52. }
  53. return r.Run()
  54. }
  55. // RefreshIndex refreshes an index represented by a name
  56. func (c *Connection) RefreshIndex(name string) (Response, error) {
  57. r := Request{
  58. Conn: c,
  59. IndexList: []string{name},
  60. method: "POST",
  61. api: "_refresh",
  62. }
  63. return r.Run()
  64. }
  65. // Optimize an index represented by a name, extra args are also allowed please check:
  66. // http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-optimize.html#indices-optimize
  67. func (c *Connection) Optimize(indexList []string, extraArgs url.Values) (Response, error) {
  68. r := Request{
  69. Conn: c,
  70. IndexList: indexList,
  71. ExtraArgs: extraArgs,
  72. method: "POST",
  73. api: "_optimize",
  74. }
  75. return r.Run()
  76. }
  77. // Stats fetches statistics (_stats) for the current elasticsearch server
  78. func (c *Connection) Stats(indexList []string, extraArgs url.Values) (Response, error) {
  79. r := Request{
  80. Conn: c,
  81. IndexList: indexList,
  82. ExtraArgs: extraArgs,
  83. method: "GET",
  84. api: "_stats",
  85. }
  86. return r.Run()
  87. }
  88. // IndexStatus fetches the status (_status) for the indices defined in
  89. // indexList. Use _all in indexList to get stats for all indices
  90. func (c *Connection) IndexStatus(indexList []string) (Response, error) {
  91. r := Request{
  92. Conn: c,
  93. IndexList: indexList,
  94. method: "GET",
  95. api: "_status",
  96. }
  97. return r.Run()
  98. }
  99. // Bulk adds multiple documents in bulk mode
  100. func (c *Connection) BulkSend(documents []Document) (Response, error) {
  101. // We do not generate a traditional JSON here (often a one liner)
  102. // Elasticsearch expects one line of JSON per line (EOL = \n)
  103. // plus an extra \n at the very end of the document
  104. //
  105. // More informations about the Bulk JSON format for Elasticsearch:
  106. //
  107. // - http://www.elasticsearch.org/guide/reference/api/bulk.html
  108. //
  109. // This is quite annoying for us as we can not use the simple JSON
  110. // Marshaler available in Run().
  111. //
  112. // We have to generate this special JSON by ourselves which leads to
  113. // the code below.
  114. //
  115. // I know it is unreadable I must find an elegant way to fix this.
  116. // len(documents) * 2 : action + optional_sources
  117. // + 1 : room for the trailing \n
  118. bulkData := make([][]byte, len(documents)*2+1)
  119. i := 0
  120. for _, doc := range documents {
  121. action, err := json.Marshal(map[string]interface{}{
  122. doc.BulkCommand: map[string]interface{}{
  123. "_index": doc.Index,
  124. "_type": doc.Type,
  125. "_id": doc.Id,
  126. },
  127. })
  128. if err != nil {
  129. return Response{}, err
  130. }
  131. bulkData[i] = action
  132. i++
  133. if doc.Fields != nil {
  134. if docFields, ok := doc.Fields.(map[string]interface{}); ok {
  135. if len(docFields) == 0 {
  136. continue
  137. }
  138. } else {
  139. typeOfFields := reflect.TypeOf(doc.Fields)
  140. if typeOfFields.Kind() == reflect.Ptr {
  141. typeOfFields = typeOfFields.Elem()
  142. }
  143. if typeOfFields.Kind() != reflect.Struct {
  144. return Response{}, fmt.Errorf("Document fields not in struct or map[string]interface{} format")
  145. }
  146. if typeOfFields.NumField() == 0 {
  147. continue
  148. }
  149. }
  150. sources, err := json.Marshal(doc.Fields)
  151. if err != nil {
  152. return Response{}, err
  153. }
  154. bulkData[i] = sources
  155. i++
  156. }
  157. }
  158. // forces an extra trailing \n absolutely necessary for elasticsearch
  159. bulkData[len(bulkData)-1] = []byte(nil)
  160. r := Request{
  161. Conn: c,
  162. method: "POST",
  163. api: "_bulk",
  164. bulkData: bytes.Join(bulkData, []byte("\n")),
  165. }
  166. return r.Run()
  167. }
  168. // Search executes a search query against an index
  169. func (c *Connection) Search(query map[string]interface{}, indexList []string, typeList []string, extraArgs url.Values) (Response, error) {
  170. r := Request{
  171. Conn: c,
  172. Query: query,
  173. IndexList: indexList,
  174. TypeList: typeList,
  175. method: "POST",
  176. api: "_search",
  177. ExtraArgs: extraArgs,
  178. }
  179. return r.Run()
  180. }
  181. // Count executes a count query against an index, use the Count field in the response for the result
  182. func (c *Connection) Count(query map[string]interface{}, indexList []string, typeList []string, extraArgs url.Values) (Response, error) {
  183. r := Request{
  184. Conn: c,
  185. Query: query,
  186. IndexList: indexList,
  187. TypeList: typeList,
  188. method: "POST",
  189. api: "_count",
  190. ExtraArgs: extraArgs,
  191. }
  192. return r.Run()
  193. }
  194. //Query runs a query against an index using the provided http method.
  195. //This method can be used to execute a delete by query, just pass in "DELETE"
  196. //for the HTTP method.
  197. func (c *Connection) Query(query map[string]interface{}, indexList []string, typeList []string, httpMethod string, extraArgs url.Values) (Response, error) {
  198. r := Request{
  199. Conn: c,
  200. Query: query,
  201. IndexList: indexList,
  202. TypeList: typeList,
  203. method: httpMethod,
  204. api: "_query",
  205. ExtraArgs: extraArgs,
  206. }
  207. return r.Run()
  208. }
  209. // Scan starts scroll over an index
  210. func (c *Connection) Scan(query map[string]interface{}, indexList []string, typeList []string, timeout string, size int) (Response, error) {
  211. v := url.Values{}
  212. v.Add("search_type", "scan")
  213. v.Add("scroll", timeout)
  214. v.Add("size", strconv.Itoa(size))
  215. r := Request{
  216. Conn: c,
  217. Query: query,
  218. IndexList: indexList,
  219. TypeList: typeList,
  220. method: "POST",
  221. api: "_search",
  222. ExtraArgs: v,
  223. }
  224. return r.Run()
  225. }
  226. // Scroll fetches data by scroll id
  227. func (c *Connection) Scroll(scrollId string, timeout string) (Response, error) {
  228. v := url.Values{}
  229. v.Add("scroll", timeout)
  230. r := Request{
  231. Conn: c,
  232. method: "POST",
  233. api: "_search/scroll",
  234. ExtraArgs: v,
  235. Body: []byte(scrollId),
  236. }
  237. return r.Run()
  238. }
  239. // Get a typed document by its id
  240. func (c *Connection) Get(index string, documentType string, id string, extraArgs url.Values) (Response, error) {
  241. r := Request{
  242. Conn: c,
  243. IndexList: []string{index},
  244. method: "GET",
  245. api: documentType + "/" + id,
  246. ExtraArgs: extraArgs,
  247. }
  248. return r.Run()
  249. }
  250. // Index indexes a Document
  251. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  252. // URL arguments, for example, to control routing, ttl, version, op_type, etc.
  253. func (c *Connection) Index(d Document, extraArgs url.Values) (Response, error) {
  254. r := Request{
  255. Conn: c,
  256. Query: d.Fields,
  257. IndexList: []string{d.Index.(string)},
  258. TypeList: []string{d.Type},
  259. ExtraArgs: extraArgs,
  260. method: "POST",
  261. }
  262. if d.Id != nil {
  263. r.method = "PUT"
  264. r.id = d.Id.(string)
  265. }
  266. return r.Run()
  267. }
  268. // Delete deletes a Document d
  269. // The extraArgs is a list of url.Values that you can send to elasticsearch as
  270. // URL arguments, for example, to control routing.
  271. func (c *Connection) Delete(d Document, extraArgs url.Values) (Response, error) {
  272. r := Request{
  273. Conn: c,
  274. IndexList: []string{d.Index.(string)},
  275. TypeList: []string{d.Type},
  276. ExtraArgs: extraArgs,
  277. method: "DELETE",
  278. id: d.Id.(string),
  279. }
  280. return r.Run()
  281. }
  282. // Run executes an elasticsearch Request. It converts data to Json, sends the
  283. // request and return the Response obtained
  284. func (req *Request) Run() (Response, error) {
  285. postData := []byte{}
  286. // XXX : refactor this
  287. if len(req.Body) > 0 {
  288. postData = req.Body
  289. } else if req.api == "_bulk" {
  290. postData = req.bulkData
  291. } else {
  292. b, err := json.Marshal(req.Query)
  293. if err != nil {
  294. return Response{}, err
  295. }
  296. postData = b
  297. }
  298. reader := bytes.NewReader(postData)
  299. newReq, err := http.NewRequest(req.method, req.Url(), reader)
  300. if err != nil {
  301. return Response{}, err
  302. }
  303. if req.method == "POST" || req.method == "PUT" {
  304. newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  305. }
  306. resp, err := req.Conn.Client.Do(newReq)
  307. if err != nil {
  308. return Response{}, err
  309. }
  310. defer resp.Body.Close()
  311. body, err := ioutil.ReadAll(resp.Body)
  312. if err != nil {
  313. return Response{}, err
  314. }
  315. if resp.StatusCode > 201 && resp.StatusCode < 400 {
  316. return Response{}, errors.New(string(body))
  317. }
  318. esResp := new(Response)
  319. if req.method == "HEAD" {
  320. esResp.Status = uint64(resp.StatusCode)
  321. } else {
  322. err = json.Unmarshal(body, &esResp)
  323. if err != nil {
  324. return Response{}, err
  325. }
  326. json.Unmarshal(body, &esResp.Raw)
  327. }
  328. if req.api == "_bulk" && esResp.Errors {
  329. for _, item := range esResp.Items {
  330. for _, i := range item {
  331. if i.Error != "" {
  332. return Response{}, &SearchError{i.Error, i.Status}
  333. }
  334. }
  335. }
  336. return Response{}, &SearchError{Msg: "Unknown error while bulk indexing"}
  337. }
  338. if esResp.Error != "" {
  339. return Response{}, &SearchError{esResp.Error, esResp.Status}
  340. }
  341. return *esResp, nil
  342. }
  343. // Url builds a Request for a URL
  344. func (r *Request) Url() string {
  345. path := "/" + strings.Join(r.IndexList, ",")
  346. if len(r.TypeList) > 0 {
  347. path += "/" + strings.Join(r.TypeList, ",")
  348. }
  349. // XXX : for indexing documents using the normal (non bulk) API
  350. if len(r.id) > 0 {
  351. path += "/" + r.id
  352. }
  353. path += "/" + r.api
  354. u := url.URL{
  355. Scheme: "http",
  356. Host: fmt.Sprintf("%s:%s", r.Conn.Host, r.Conn.Port),
  357. Path: path,
  358. RawQuery: r.ExtraArgs.Encode(),
  359. }
  360. return u.String()
  361. }
  362. // Buckets returns list of buckets in aggregation
  363. func (a Aggregation) Buckets() []Bucket {
  364. result := []Bucket{}
  365. if buckets, ok := a["buckets"]; ok {
  366. for _, bucket := range buckets.([]interface{}) {
  367. result = append(result, bucket.(map[string]interface{}))
  368. }
  369. }
  370. return result
  371. }
  372. // Key returns key for aggregation bucket
  373. func (b Bucket) Key() interface{} {
  374. return b["key"]
  375. }
  376. // DocCount returns count of documents in this bucket
  377. func (b Bucket) DocCount() uint64 {
  378. return uint64(b["doc_count"].(float64))
  379. }
  380. // Aggregation returns aggregation by name from bucket
  381. func (b Bucket) Aggregation(name string) Aggregation {
  382. if agg, ok := b[name]; ok {
  383. return agg.(map[string]interface{})
  384. } else {
  385. return Aggregation{}
  386. }
  387. }
  388. // PutMapping registers a specific mapping for one or more types in one or more indexes
  389. func (c *Connection) PutMapping(typeName string, mapping map[string]interface{}, indexes []string) (Response, error) {
  390. r := Request{
  391. Conn: c,
  392. Query: mapping,
  393. IndexList: indexes,
  394. method: "PUT",
  395. api: "_mappings/" + typeName,
  396. }
  397. return r.Run()
  398. }
  399. func (c *Connection) GetMapping(types []string, indexes []string) (Response, error) {
  400. r := Request{
  401. Conn: c,
  402. IndexList: indexes,
  403. method: "GET",
  404. api: "_mapping/" + strings.Join(types, ","),
  405. }
  406. return r.Run()
  407. }
  408. // IndicesExist checks whether index (or indices) exist on the server
  409. func (c *Connection) IndicesExist(indexes []string) (bool, error) {
  410. r := Request{
  411. Conn: c,
  412. IndexList: indexes,
  413. method: "HEAD",
  414. }
  415. resp, err := r.Run()
  416. return resp.Status == 200, err
  417. }
  418. func (c *Connection) Update(d Document, query map[string]interface{}, extraArgs url.Values) (Response, error) {
  419. r := Request{
  420. Conn: c,
  421. Query: query,
  422. IndexList: []string{d.Index.(string)},
  423. TypeList: []string{d.Type},
  424. ExtraArgs: extraArgs,
  425. method: "POST",
  426. api: "_update",
  427. }
  428. if d.Id != nil {
  429. r.id = d.Id.(string)
  430. }
  431. return r.Run()
  432. }
  433. // DeleteMapping deletes a mapping along with all data in the type
  434. func (c *Connection) DeleteMapping(typeName string, indexes []string) (Response, error) {
  435. r := Request{
  436. Conn: c,
  437. IndexList: indexes,
  438. method: "DELETE",
  439. api: "_mappings/" + typeName,
  440. }
  441. return r.Run()
  442. }
  443. // AddAlias creates an alias to one or more indexes
  444. func (c *Connection) AddAlias(alias string, indexes []string) (Response, error) {
  445. command := map[string]interface{}{
  446. "actions": make([]map[string]interface{}, 1),
  447. }
  448. for _, index := range indexes {
  449. command["actions"] = append(command["actions"].([]map[string]interface{}), map[string]interface{}{
  450. "add": map[string]interface{}{
  451. "index": index,
  452. "alias": alias,
  453. },
  454. })
  455. }
  456. r := Request{
  457. Conn: c,
  458. Query: command,
  459. method: "POST",
  460. api: "_aliases",
  461. }
  462. return r.Run()
  463. }