golang使用thrift2协议connect hbase

前端之家收集整理的这篇文章主要介绍了golang使用thrift2协议connect hbase前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

1. 我当前的环境是:

go version

go version go1.6 windows/amd64

hbase(main):001:0> version
1.2.4,r67592f3d062743907f8c5ae00dbbe1ae4f69e5af,Tue Oct 25 18:10:20 CDT 2016

创建,测试表 “tb_test”

hbase(main):003:0*create 'tb_test','cf',SPLITS=>['10','20','30','40']

hbase(main):003:0* desc "tb_test"
Table tb_test is ENABLED
tb_test
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf',BLOOMFILTER => 'ROW',VERSIONS => '5',IN_MEMORY => 'false',KEEP_DELETED_CELLS => 'FALSE',DATA_BLOCK_ENCODING => 'N
ONE',TTL => 'FOREVER',COMPRESSION => 'NONE',MIN_VERSIONS => '0',BLOCKCACHE => 'true',BLOCKSIZE => '65536',REPLICATION_SCOPE =>
'0'}
1 row(s) in 0.6630 seconds

在hadoop集群中启动thrfit2 Server:

./hbase-daemon.sh start thrift2

2. 准备golang客户端

2.1. 和python类似,下载thrift2库:http://thrift.apache.org/ 下载最新的 0.10 src版本,如果要以前的版本,也可到 http://archive.apache.org/dist/thrift 去下载 。

2.2. 编译安装:具体有多种方法,Maven,./configure make make install ...

2.3. 生成go代码:thrift -o <output directory{默认当前目录gen-py}> -gen go {对应版本的hbase源码地址}\src\main\resources\org\apache\Hadoop\hbase\thrift2

2.4. 将对应版本中golang接口code复制到当前golang安装目录,{$GOROOT}\src或者{$GOPATH}\src...

2.5. 再通过git获取外部资源git.apache.org/thrift.git/lib/go/thrift : go getgit.apache.org/thrift.git/lib/go/thrift 或者直接到https://github.com/apache/thrift 直接下载zip包,将至放在$GPPATH\src\git.apache.org\thrift.git\ 目录下即可。

3. 编写客户端代码

  1. /*
  2. * @Author: lesorb.cn
  3. * @Date: 2017-03-21 10:41:04
  4. * @Last Modified by: lesorb.cn
  5. * @Last Modified time: 2017-03-21 15:08:27
  6. */
  7. package main
  8. import (
  9. // "encoding/binary"
  10. "fmt"
  11. "git.apache.org/thrift.git/lib/go/thrift"
  12. "hbase"
  13. "net"
  14. "os"
  15. "reflect"
  16. // "strconv"
  17. "time"
  18. )
  19.  
  20. const (
  21. HOST = "datanode1.hadoop"
  22. PORT = "9090"
  23. TESTRECORD = 10
  24. )
  25.  
  26. func main() {
  27. startTime := currentTimeMillis()
  28. logformatstr_ := "----%s\n"
  29. logformatstr := "----%s Cut times :%d-%d=%d MS \n\n"
  30. logformattitle := "create connection "
  31. rowkey := "row_154092606735603"
  32. temptable := "tb_test"
  33. protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
  34. transport,err := thrift.NewTSocket(net.JoinHostPort(HOST,PORT))
  35. if err != nil {
  36. fmt.Fprintln(os.Stderr,"error resolving address:",err)
  37. os.Exit(1)
  38. }
  39. client := hbase.NewTHBaseServiceClientFactory(transport,protocolFactory)
  40. if err := transport.Open(); err != nil {
  41. fmt.Fprintln(os.Stderr,"Error opening socket to "+HOST+":"+PORT," ",err)
  42. os.Exit(1)
  43. }
  44. tmpendTime := currentTimeMillis()
  45. fmt.Printf(logformatstr,logformattitle,tmpendTime,startTime,(tmpendTime - startTime))
  46. defer transport.Close()
  47.  
  48. //--------------Exists--------------------
  49. logformattitle = " call exist method : "
  50. fmt.Printf(logformatstr_,logformattitle)
  51. tmpstartTime := currentTimeMillis()
  52. //
  53. isexists,err := (client.Exists([]byte(temptable),&hbase.TGet{Row: []byte(rowkey)}))
  54. fmt.Printf("rowkey{%s} in table{%s} Exists:%t\t",rowkey,temptable,isexists)
  55. if err != nil {
  56. fmt.Printf("Exists err:%s\n",err)
  57. }
  58. fmt.Println("")
  59. tmpendTime = currentTimeMillis()
  60. fmt.Printf(logformatstr,tmpstartTime,(tmpendTime - tmpstartTime))
  61.  
  62.  
  63. //------------Get---------------
  64. logformattitle = "call get method to retrieve data:"
  65. fmt.Printf(logformatstr_,logformattitle)
  66. tmpstartTime = currentTimeMillis()
  67. //
  68. result,err := (client.Get([]byte(temptable),&hbase.TGet{Row: []byte(rowkey)}))
  69. if err != nil {
  70. fmt.Printf("Get err:%s\n",err)
  71. } else {
  72. fmt.Println("Rowkey:" + string(result.Row))
  73. for _,cv := range result.ColumnValues {
  74. printStruct(cv)
  75. }
  76. }
  77. tmpendTime = currentTimeMillis()
  78. fmt.Printf(logformatstr,(tmpendTime - tmpstartTime))
  79. //--------------put------------------------
  80. logformattitle = "call Put method to write data : "
  81. rowkey = "row_154092606735604"
  82. fmt.Printf(logformatstr_,logformattitle)
  83. tmpstartTime = currentTimeMillis()
  84. cvarr := []*hbase.TColumnValue{
  85. &hbase.TColumnValue{
  86. Family: []byte("cf"),Qualifier: []byte("title"),Value: []byte("welcome to lesorb.cn")},&hbase.TColumnValue{
  87. Family: []byte("cf"),Qualifier: []byte("content"),Value: []byte("welcome,why are u here!")},Qualifier: []byte("create"),Value: []byte("user5")},Qualifier: []byte("create_time"),Value: []byte("2017-03-21 16:17:26")},Qualifier: []byte("tags"),lesorb")}}
  88. temptput := hbase.TPut{Row: []byte(rowkey),ColumnValues: cvarr}
  89. err = client.Put([]byte(temptable),&temptput)
  90. if err != nil {
  91. fmt.Printf("Put err:%s\n",err)
  92. } else {
  93. fmt.Println("Put done")
  94. }
  95. tmpendTime = currentTimeMillis()
  96. fmt.Printf(logformatstr,(tmpendTime - tmpstartTime))
  97. //------------DeleteSingle------------
  98. logformattitle = "call DeleteSingle method to delete a data: "
  99. fmt.Printf(logformatstr_,logformattitle)
  100. tmpstartTime = currentTimeMillis()
  101. tdelete := hbase.TDelete{Row: []byte(rowkey)}
  102. err = client.DeleteSingle([]byte(temptable),&tdelete)
  103. if err != nil {
  104. fmt.Printf("DeleteSingle err:%s\n",err)
  105. } else {
  106. fmt.Printf("DeleteSingel done\n")
  107. }
  108.  
  109. tmpendTime = currentTimeMillis()
  110. fmt.Printf(logformatstr,(tmpendTime - tmpstartTime))
  111.  
  112. }
  113.  
  114. //struct
  115. func printStruct(cv interface{}) {
  116. switch reflect.ValueOf(cv).Interface().(type) {
  117. case *hbase.TColumnValue:
  118. s := reflect.ValueOf(cv).Elem()
  119. typeOfT := s.Type()
  120. //get Thrift2 field
  121. for i := 0; i < s.NumField(); i++ {
  122. f := s.Field(i)
  123. fileldformatstr := "\t%d: %s(%s)= %v\n"
  124. switch f.Interface().(type) {
  125. case []uint8:
  126. fmt.Printf(fileldformatstr,i,typeOfT.Field(i).Name,f.Type(),string(f.Interface().([]uint8)))
  127. case *int64:
  128. var tempint64 int64
  129. if f.Interface().(*int64) == nil {
  130. tempint64 = 0
  131. } else {
  132. tempint64 = *f.Interface().(*int64)
  133. }
  134. fmt.Printf(fileldformatstr,tempint64)
  135. default:
  136. fmt.Printf("I don't know")
  137. }
  138. }
  139. default:
  140. fmt.Printf("I don't know")
  141. fmt.Print(reflect.ValueOf(cv))
  142. }
  143. }
  144.  
  145. func currentTimeMillis() int64 {
  146. return time.Now().UnixNano() / 1000000
  147. }


go run hbase_t.go

运行结果如下:

  1. ----create connection Cut times :1490084989059-1490084989042=17 MS
  2.  
  3. ---- call exist method :
  4. rowkey{row_154092606735603} in table{tb_test} Exists:true
  5. ---- call exist method : Cut times :1490084989073-1490084989060=13 MS
  6.  
  7. ----call get method to retrieve data:
  8. Rowkey:row_154092606735603
  9. 0: Family([]uint8)= cf
  10. 1: Qualifier([]uint8)= content
  11. 2: Value([]uint8)= He's full of bad ideas.!
  12. 3: Timestamp(*int64)= 1489489549481
  13. 4: Tags([]uint8)=
  14. 0: Family([]uint8)= cf
  15. 1: Qualifier([]uint8)= create
  16. 2: Value([]uint8)= user4
  17. 3: Timestamp(*int64)= 1489489549481
  18. 4: Tags([]uint8)=
  19. 0: Family([]uint8)= cf
  20. 1: Qualifier([]uint8)= create_time
  21. 2: Value([]uint8)= 2017-03-14 11:04:58
  22. 3: Timestamp(*int64)= 1489489549481
  23. 4: Tags([]uint8)=
  24. 0: Family([]uint8)= cf
  25. 1: Qualifier([]uint8)= title
  26. 2: Value([]uint8)= idea
  27. 3: Timestamp(*int64)= 1489489549481
  28. 4: Tags([]uint8)=
  29. ----call get method to retrieve data: Cut times :1490084989089-1490084989073=16 MS
  30.  
  31. ----call Put method to write data :
  32. Put done
  33. ----call Put method to write data : Cut times :1490084989177-1490084989089=88 MS
  34.  
  35. ----call DeleteSingle method to delete a data:
  36. DeleteSingel done
  37. ----call DeleteSingle method to delete a data: Cut times :1490084989195-1490084989178=17 MS

相关code参加我的下载链接

http://download.csdn.net/detail/lesorb/9788720

猜你在找的Go相关文章