golang使用thrift2协议connect hbase

1. 我当前的环境是:

go version

go version go1.6 windows/amd64

hbase(main):001:0> version
1.2.4,r67592f3d062743907f8c5ae00dbbe1ae4f69e5af,Tue Oct 25 18:10:20 CDT 2016

创建,测试表 “tb_test”

hbase(main):003:0*create 'tb_test','cf',SPLITS=>['10','20','30','40']

hbase(main):003:0* desc "tb_test"
Table tb_test is ENABLED
tb_test
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf',BLOOMFILTER => 'ROW',VERSIONS => '5',IN_MEMORY => 'false',KEEP_DELETED_CELLS => 'FALSE',DATA_BLOCK_ENCODING => 'N
ONE',TTL => 'FOREVER',COMPRESSION => 'NONE',MIN_VERSIONS => '0',BLOCKCACHE => 'true',BLOCKSIZE => '65536',REPLICATION_SCOPE =>
'0'}
1 row(s) in 0.6630 seconds

在hadoop集群中启动thrfit2 Server:

./hbase-daemon.sh start thrift2

2. 准备golang客户端

2.1. 和python类似,下载thrift2库:http://thrift.apache.org/ 下载最新的 0.10 src版本,如果要以前的版本,也可到 http://archive.apache.org/dist/thrift 去下载 。

2.2. 编译安装:具体有多种方法,Maven,./configure make make install ...

2.3. 生成go代码:thrift -o <output directory{默认当前目录gen-py}> -gen go {对应版本的hbase源码地址}\src\main\resources\org\apache\Hadoop\hbase\thrift2

2.4. 将对应版本中golang接口code复制到当前golang安装目录,{$GOROOT}\src或者{$GOPATH}\src...

2.5. 再通过git获取外部资源git.apache.org/thrift.git/lib/go/thrift : go getgit.apache.org/thrift.git/lib/go/thrift 或者直接到https://github.com/apache/thrift 直接下载zip包,将至放在$GPPATH\src\git.apache.org\thrift.git\ 目录下即可。

3. 编写客户端代码

/*
* @Author: lesorb.cn
* @Date:   2017-03-21 10:41:04
* @Last Modified by:   lesorb.cn
* @Last Modified time: 2017-03-21 15:08:27
*/
package main
import (
   // "encoding/binary"
    "fmt"
    "git.apache.org/thrift.git/lib/go/thrift"
    "hbase"
    "net"
    "os"
    "reflect"
//    "strconv"
    "time"
)

const (
    HOST       = "datanode1.hadoop"
    PORT       = "9090"
    TESTRECORD = 10
)

func main() {
    startTime := currentTimeMillis()
    logformatstr_ := "----%s\n"
    logformatstr := "----%s Cut times :%d-%d=%d MS \n\n"
    logformattitle := "create connection "
    rowkey := "row_154092606735603"
    temptable := "tb_test"
    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
    transport,err := thrift.NewTSocket(net.JoinHostPort(HOST,PORT))
    if err != nil {
        fmt.Fprintln(os.Stderr,"error resolving address:",err)
        os.Exit(1)
    }
    client := hbase.NewTHBaseServiceClientFactory(transport,protocolFactory)
    if err := transport.Open(); err != nil {
        fmt.Fprintln(os.Stderr,"Error opening socket to "+HOST+":"+PORT," ",err)
        os.Exit(1)
    }
    tmpendTime := currentTimeMillis()
    fmt.Printf(logformatstr,logformattitle,tmpendTime,startTime,(tmpendTime - startTime))
    defer transport.Close()

    //--------------Exists--------------------
    logformattitle =  " call exist method : "
    fmt.Printf(logformatstr_,logformattitle)
    tmpstartTime := currentTimeMillis()
    //
    isexists,err := (client.Exists([]byte(temptable),&hbase.TGet{Row: []byte(rowkey)}))
    fmt.Printf("rowkey{%s} in table{%s} Exists:%t\t",rowkey,temptable,isexists)
    if err != nil {
        fmt.Printf("Exists err:%s\n",err)
    }
    fmt.Println("")
    tmpendTime = currentTimeMillis()
    fmt.Printf(logformatstr,tmpstartTime,(tmpendTime - tmpstartTime))


    //------------Get---------------
    logformattitle = "call get method to retrieve data:"
    fmt.Printf(logformatstr_,logformattitle)
    tmpstartTime = currentTimeMillis()
    //
    result,err := (client.Get([]byte(temptable),&hbase.TGet{Row: []byte(rowkey)}))
    if err != nil {
        fmt.Printf("Get err:%s\n",err)
    } else {
        fmt.Println("Rowkey:" + string(result.Row))
        for _,cv := range result.ColumnValues {
            printStruct(cv)
        }
    }
    tmpendTime = currentTimeMillis()
    fmt.Printf(logformatstr,(tmpendTime - tmpstartTime))
	
    //--------------put------------------------
    logformattitle = "call Put method to write data : "
    rowkey = "row_154092606735604"
    fmt.Printf(logformatstr_,logformattitle)
    tmpstartTime = currentTimeMillis()
    cvarr := []*hbase.TColumnValue{
        &hbase.TColumnValue{
            Family:    []byte("cf"),Qualifier: []byte("title"),Value:     []byte("welcome to lesorb.cn")},&hbase.TColumnValue{
            Family:    []byte("cf"),Qualifier: []byte("content"),Value:     []byte("welcome,why are u here!")},Qualifier: []byte("create"),Value:     []byte("user5")},Qualifier: []byte("create_time"),Value:     []byte("2017-03-21 16:17:26")},Qualifier: []byte("tags"),lesorb")}}
    temptput := hbase.TPut{Row: []byte(rowkey),ColumnValues: cvarr}
    err = client.Put([]byte(temptable),&temptput)
    if err != nil {
        fmt.Printf("Put err:%s\n",err)
    } else {
        fmt.Println("Put done")
    }
    tmpendTime = currentTimeMillis()
    fmt.Printf(logformatstr,(tmpendTime - tmpstartTime))
	
    //------------DeleteSingle------------
    logformattitle = "call DeleteSingle method to delete a data: "
    fmt.Printf(logformatstr_,logformattitle)
    tmpstartTime = currentTimeMillis()
    tdelete := hbase.TDelete{Row: []byte(rowkey)}
    err = client.DeleteSingle([]byte(temptable),&tdelete)
    if err != nil {
        fmt.Printf("DeleteSingle err:%s\n",err)
    } else {
        fmt.Printf("DeleteSingel done\n")
    }

    tmpendTime = currentTimeMillis()
    fmt.Printf(logformatstr,(tmpendTime - tmpstartTime))

}

//struct
func printStruct(cv interface{}) {
    switch reflect.ValueOf(cv).Interface().(type) {
    case *hbase.TColumnValue:
        s := reflect.ValueOf(cv).Elem()
        typeOfT := s.Type()
        //get Thrift2 field
        for i := 0; i < s.NumField(); i++ {
            f := s.Field(i)
            fileldformatstr := "\t%d: %s(%s)= %v\n"
            switch f.Interface().(type) {
            case []uint8:
                fmt.Printf(fileldformatstr,i,typeOfT.Field(i).Name,f.Type(),string(f.Interface().([]uint8)))
            case *int64:
                var tempint64 int64
                if f.Interface().(*int64) == nil {
                    tempint64 = 0
                } else {
                    tempint64 = *f.Interface().(*int64)
                }
                fmt.Printf(fileldformatstr,tempint64)
            default:
                fmt.Printf("I don't know")
            }
        }
    default:
        fmt.Printf("I don't know")
        fmt.Print(reflect.ValueOf(cv))
    }
}

func currentTimeMillis() int64 {
    return time.Now().UnixNano() / 1000000
}


go run hbase_t.go

运行结果如下:

----create connection  Cut times :1490084989059-1490084989042=17 MS

---- call exist method :
rowkey{row_154092606735603} in table{tb_test} Exists:true
---- call exist method :  Cut times :1490084989073-1490084989060=13 MS

----call get method to retrieve data:
Rowkey:row_154092606735603
        0: Family([]uint8)= cf
        1: Qualifier([]uint8)= content
        2: Value([]uint8)= He's full of bad ideas.!
        3: Timestamp(*int64)= 1489489549481
        4: Tags([]uint8)=
        0: Family([]uint8)= cf
        1: Qualifier([]uint8)= create
        2: Value([]uint8)= user4
        3: Timestamp(*int64)= 1489489549481
        4: Tags([]uint8)=
        0: Family([]uint8)= cf
        1: Qualifier([]uint8)= create_time
        2: Value([]uint8)= 2017-03-14 11:04:58
        3: Timestamp(*int64)= 1489489549481
        4: Tags([]uint8)=
        0: Family([]uint8)= cf
        1: Qualifier([]uint8)= title
        2: Value([]uint8)= idea
        3: Timestamp(*int64)= 1489489549481
        4: Tags([]uint8)=
----call get method to retrieve data: Cut times :1490084989089-1490084989073=16 MS

----call Put method to write data :
Put done
----call Put method to write data :  Cut times :1490084989177-1490084989089=88 MS

----call DeleteSingle method to delete a data:
DeleteSingel done
----call DeleteSingle method to delete a data:  Cut times :1490084989195-1490084989178=17 MS

相关code参加我的下载链接:

http://download.csdn.net/detail/lesorb/9788720

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


简介 WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket让客户端和服务端之间的数据交换变得非常简单,且允许服务器主动向客户端推送数据,并且之后客户端和服务端所有的通信都依靠这个专用协议进行。 在websocket出现之前,一些网站为了实现消息的推送,采用最多的技术是A
使用gin框架编写服务端应用,配置路由接收websocket请求并处理。同时实现一个websocket命令行客户端用于与服务端通信。
## 前言 linux自带的crontab默认情况下只能精确到分钟,没法执行秒级任务。当然,也不是不行,比如: ```shell * * * * * for i in $(seq 1 11);do echo hello &gt;&gt; /home/heruos/tmp.txt;sleep 5;do
前言 代码参考自《Building Distributed Application in Gin》 需求:设计一个食谱相关的API,数据存放到切片中。 设计模型和API 模型 type Recipe struct { // 菜品名 Name string `json:&quot;name&quot;
前言 通过钉钉群机器人的webhook,实现消息推送。 本文代码仅示例markdown格式的消息。 示例代码 注意修改钉钉机器人的webhook package main import ( &quot;bytes&quot; &quot;encoding/json&quot; &quot;fmt&q
golang-jwt是go语言中用来生成和解析jwt的一个第三方库,早先版本也叫jwt-go。本文中使用目前最新的v5版本。
## 前言 假设gRPC服务端的主机名为`qw.er.com`,需要为gRPC服务端和客户端之间的通信配置tls双向认证加密。 ## 生成证书 1. 生成ca根证书。生成过程会要求填写密码、CN、ON、OU等信息,记住密码。 ```shell openssl req -x509 -newkey rs
前言 在go语言中,因为字符串只能被访问,不能被修改,所以进行字符串拼接的时候,golang都需要进行内存拷贝,造成一定的性能消耗。 方式1:操作符 + 特点:简单,可读性良好。每次拼接都会产生内存拷贝,性能一般。仅适用于字符串类型的变量。 示例代码: str1 := &quot;hello &qu
前言 正常情况下,主协程一旦退出,其子协程也会全部中止并退出。为了阻塞主协程,可以使用time.Sleep(),也可以使用WaitGroup。 用法说明 // 导入sync import &quot;sync&quot; // 定义一个sync.WaitGroup var wg sync.WaitG
前言 方便在内网环境中获取服务器本机IP,省了在脚本中过滤ip或ifconfig的结果。 如果内网中有nginx的话,通过nginx获取本机IP也很方便,可参考 借助nginx自动获取本机IP 示例代码 package main import ( &quot;fmt&quot; &quot;net&
简介 logrus是一个第三方日志库,性能虽不如zap和zerolog,但方便易用灵活。logrus完全兼容标准的log库,还支持文本、JSON两种日志输出格式。 特点 相较于标准库,logrus有更细致的日志级别,从高到低分别是:trace &gt; debug &gt; info &gt; wa
基于Gin框架编写的Web API,实现简单的CRUD功能,数据存放在MongoDB,并设置Redis缓存。
## 简介 借助 `github.com/hpcloud/tail` ,可以实时追踪文件变更,达到类似shell命令`tail -f`的效果。 ## 示例代码 以下示例代码用于实时读取nginx的`access.log`日志文件,读取到后输出到控制台。如果nginx日志做了json格式化,还可以解析
前言 go在操作MySQL时,可以使用ORM(比如gorm、xorm),也可以使用原生sql。本文以使用sqlx为例,简单记录步骤。 go version: 1.16 安装相关库 # mysql驱动 go get github.com/go-sql-driver/mysql # 基于MySQL驱动的
前言 某次在客户内网传输数据的时候,防火墙拦截了SSH的数据包,导致没法使用scp命令传输文件,tcp协议和http协议也只放开了指定端口,因此想了个用http传输的“曲线救国”方案。 假设要从192.168.1.23传输到192.168.2.34,因防火墙限制,只能从1.23访问2.34,不能从2
前言 go version: 1.18 本文主要包含JSON、Form、Uri、XML的数据解析与绑定。 JSON数据解析与绑定 go代码 package main import ( &quot;net/http&quot; &quot;github.com/gin-gonic/gin&quot;
## 前言 假设一个场景,服务端部署在内网,客户端需要通过暴露在公网的nginx与服务端进行通信。为了避免在公网进行 http 明文通信造成的信息泄露,nginx与客户端之间的通信应当使用 https 协议,并且nginx也要验证客户端的身份,也就是mTLS双向加密认证通信。 这条通信链路有三个角色
使用gin框架编写web程序作为alertmanager的webhook receiver,解析数据并发送到钉钉
前言 多阶段封装docker镜像,使用scratch镜像,尽量减小镜像包的体积。 封装用于编译的go镜像 Dockerfile FROM golang:1.20.1 AS builder WORKDIR /apps COPY . /apps/ ENV CGO_ENABLED=0 ENV GOOS=l
前言 标准库strconv提供了字符串类型与其他常用数据类型之间的转换。 strconv.FormatX()用于X类型转字符串,如strconv.FormatFloat()用于浮点型转字符串。 strconv.ParseX()用于字符串转X类型,如strconv.ParseFloat()用于字符串转