获取客户端
项目地址:Pegasus scala client 下载:
git clone git@github.com:apache/incubator-pegasus.git
cd incubator-pegasus/scala-client
选择所使用的版本并构建,建议使用master版本。同时注意,scala客户端构建依赖Java客户端,请参考获取Java客户端在项目中添加Java依赖。你可以打包成Jar包进行使用:
sbt package
或者,安装到本地的sbt repository,方便在sbt项目中使用:
sbt publish-local
或者,安装到本地的maven repository:
sbt publish-m2
项目默认使用scala-2.11进行构建,打包发布时则同时发布2.11版本(pegasus-scala-client_2.11)和2.12版本(pegasus-scala-client_2.12),如果你的项目使用sbt构建,则可配置为:
//使用sbt仓库,不需要添加后缀,默认使用当前scala版本号,即使用2.12
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
"com.xiaomi.infra" %% "pegasus-scala-client" % "1.11.4-1-SNAPSHOT"
)
或者配置为:
//使用maven仓库(你可以使用resolvers ++= Seq()添加自定义maven仓库),需要添加后缀
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
"com.xiaomi.infra" % "pegasus-scala-client_2.11" % "1.11.4-1-SNAPSHOT"
)
如果你的项目通过maven构建,则可通过maven配置在项目中使用,例如:
<dependency>
<groupId>com.xiaomi.infra</groupId>
<artifactId>pegasus-scala-client_2.11</artifactId>
<version>1.11.4-1</version>
</dependency>
使用客户端
获取实例
通过指定server配置信息获取实例,Scala提供两种获取实例的接口:
1、文件路径作为配置参数: 参见Java客户端文件配置
def createClient(configPath: String): ScalaPegasusClient
例如:
val pegasusClient = ScalaPegasusClientFactory.createClient("resource:///pegasus.properties")
2、Properties对象作为配置:
def createClient(props: Properties): ScalaPegasusClient
例如:
Properties pegasusConfig = new Properties();
pegasusConfig.setProperty("meta_servers", "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603");
pegasusConfig.setProperty("operation_timeout", 100);
val pegasusClient = ScalaPegasusClientFactory.createClient(pegasusConfig)
数据操作
注意:调用函数前请确认导入Serializers._
,详情参阅实现原理
val hashKey = 12345L
pegasusClient.set(table, hashKey, "sort_1", "value_1")
val value = pegasusClient.get(table, hashKey, "sort_1").as[String]
pegasusClient.del(table, hashKey, "sort_1")
pegasusClient.exists(table, hashKey, "sort_1")
pegasusClient.sortKeyCount(table, hashKey)
pegasusClient.close
接口定义
scala的客户端类地址为:com.xiaomi.infra.pegasus.scalaclient
,主要包括以下四个类:
类名 | 功能 |
---|---|
ScalaPegasusClientFactory | Client工厂类,用于创建Client实例 |
ScalaPegasusClient | Client类,封装了各种同步API,也可用于创建Table实例 |
ScalaPegasusTable | Table类,封装了操作单个Table数据的同步API |
ScalaPegasusAsyncTable | Table类,封装了操作单个Table数据的异步API |
用户可以选择使用Client类(ScalaPegasusClient)或者是Table类(ScalaPegasusTable或者ScalaPegasusAsyncTable)存取数据,区别如下:
- Client类直接在参数中指定表名,省去了打开表的动作,使用更便捷。
- Table类同时支持同步和异步API,而Client类只支持同步API。
- Table类可以为每个操作设置单独的超时,而Client类无法单独指定超时,只能使用配置文件中的默认超时。
- Table类的超时更准确,而Client类在首次读写请求时可能需要在内部初始化Table对象,所以首次读写的超时可能不太准确。
ScalaPegasusClient接口
实现原理
ScalaPegasusClient
接口通过持有ScalaPegasusTable
实现对特定表的访问,而ScalaPegasusTable
实际是封装了Java client的接口PegasusTableInterface
而实现的。函数形式如下所示:def get[H, S](table: String, hashKey: H, sortKey: S)(implicit hSer: SER[H], sSer: SER[S]) = { getTable(table).get(hashKey, sortKey) }
每一个数据表的操作函数都被定义为泛型函数,参数列表
(table: String, hashKey: H, sortKey: S)
是实际传入的参数,同时使用隐式参数(implicit hSer: SER[H], sSer: SER[S])
完成对参数列表(table: String, hashKey: H, sortKey: S)
泛型的转换。其中SER[H]是类Serializers
的泛型声明,该类包含对不同泛型对象的隐式转换函数(转换成Java client中PegasusTableInterface
的byte[]参数
,在scala中对应为Array[Byte]
,例子展示的是当泛型在使用的时候被定义为String
时的隐式转换函数:implicit object Utf8String extends Serializer[String] { override def serialize(obj: String): Array[Byte] = if (obj == null) null else obj.getBytes("UTF-8") override def deserialize(bytes: Array[Byte]): String = if (bytes == null) null else new String(bytes, "UTF-8") }
客户端在调用
ScalaPegasusClient
提供的方法时,当对第一个参数列表的泛型参数传入String
类型变量的时候,将被自动转换为Array[Byte]
类型变量,并传入PegasusTableInterface
的对应方法中。请确保包含Serializers._
,否则无法完成参数的类型转换,你可以使用:import com.xiaomi.infra.pegasus.scalaclient.Serializers._
导入依赖,目前接受的自动类型转换包括
String
、Boolean
、Int
、Long
、Short
、Double
,这些类型可自动转换为Array[Byte]
。API功能
exists
判断key是否存在,参见Java客户端文档#exist
def exists[H, S](table: String, hashKey: H, sortKey: S)
table:表名,通常为
String
类型
hashKey:通常为String
类型
sortKey:通常为String
类型
return: 返回是否存在,boolean
类型sortKeyCount
获取一个hashkey下的sortkey值,参见Java客户端文档#sortKeyCount
def sortKeyCount[H](table: String, hashKey: H)
table:表名,通常为
String
类型
hashKey:通常为String
类型
return:返回sortKeys个数,long
类型get
获取一条数据,参见Java客户端文档#get
def get[H, S](table: String, hashKey: H, sortKey: S)
table:表名,通常为
String
类型
hashKey:通常为String
类型
sortKey:通常为String
类型
return:返回获取值,Array[Byte]
类型,你可以使用as[String]
转换为String
类型batchGet
读取一批数据,对get函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。如果抛出了异常,则values中的结果是未定义的,参见Java客户端文档#batchGet
def batchGet[H, S](table: String, keys: List[PegasusKey[H, S]])
table:表名,通常为
String
类型
keys:PegasusKey列表,由hashKey和SortKey组成
return:返回获取值列表,PegasusResultList
类型batchGet2
读取一批数据,对get函数的批量封装。该函数并发地向server发送异步请求,但与上面batchGet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchGet2
def batchGet2[H, S](table: String, keys: Seq[PegasusKey[H, S]])
table:表名,通常为
String
类型
keys:PegasusKey列表,有hashKey和SortKey组成
return:返回获取值列表,PegasusResultList
类型multiGet
Java client包含多种multiGet接口,提供读同一HashKey下的多行数据功能,这里封装的是:
public boolean multiGet(String tableName, byte[] hashKey, List<byte[]> sortKeys, int maxFetchCount, int maxFetchSize, List<Pair<byte[], byte[]>> values) throws PException;
支持最大数据量
maxFetchCount
和最大数据大小maxFetchSize
的参数设置,参见Java客户端文档#multiGetdef multiGet[H, S](table: String, hashKey: H, sortKeys: Seq[S], maxFetchCount: Int = 100, maxFetchSize: Int = 1000000)
table:表名,通常为
String
类型
hashKey:通常为String
类型
sortKeys:sortKey列表
maxFetchCount:最大获取数据量,这里默认为100
maxFetchSize:最大获取数据值大小,这里默认为1000000字节
return:返回获取值列表,convertMultiGetResult
类型multiGetRange
Java client包含多种multiGet接口,提供读同一HashKey下的多行数据功能,这里封装的是:
public boolean multiGet(String tableName, byte[] hashKey, byte[] startSortKey, byte[] stopSortKey, MultiGetOptions options, int maxFetchCount, int maxFetchSize, List<Pair<byte[], byte[]>> values) throws PException;
可以支持SortKey的范围查询和条件过滤,只读取满足特定条件的数据,参见Java客户端文档#multiGet
def multiGetRange[H, S](hashKey: H, startSortKey: S, stopSortKey: S, options: Options.MultiGet, maxFetchCount: Int = 100, maxFetchSize: Int = 1000000, timeout: Duration = 0 milli)
table:表名,通常为
String
类型
hashKey:hashKey,通常为String
类型
startSortKey:sortKey范围的起始值
stopSortKey:sortKey范围的终止值
options:查询条件
maxFetchCount:最大数据量,默认为100
maxFetchSize:最大数据值大小,默认为1000000字节
timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
return:返回获取值列表,convertMultiGetResult
类型batchMultiGet
对multiGet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。如果抛出了异常,则values中的结果是未定义的,参见Java客户端文档#batchMultiGet
def batchMultiGet[H, S](keys: Seq[(H, Seq[S])], timeout: Duration = 0 milli)
keys:hashKey-sortKeys列表,如:
Seq(("1",Seq("1","2")),("1",Seq("1","2")))
timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
return:返回获取值列表,List
类型batchMultiGet2
对multiGet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiGet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchMultiGet2
def batchMultiGet2[H, S](keys: Seq[(H, Seq[S])], timeout: Duration = 0 milli)
keys:hashKey-sortKeys列表,如:
Seq(("1",Seq("1","2")),("1",Seq("1","2")))
timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
return:返回获取值列表,List
类型set
写单行数据
def set[H, S, V](hashKey: H, sortKey: S, value: V, ttl: Duration = 0 second, timeout: Duration = 0 milli)
hashKey:通常为
String
类型
sortKey:通常为String
类型
value:对应key的写入值,通常为String
类型
ttl:写入值保留时间,默认为0,表示永久保留
timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
return:无返回值batchSet
写一批数据,对set函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchSet
def batchSet[H, S, V](table: String, items: Seq[SetItem[H, S, V]])
table:表名,通常为
String
类型
items:写入值列表,由hashKey、sortKey、value组成
return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)batchSet2
对set函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchSet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchSet2
multiSet
Java client有两种接口,提供写同一HashKey下的多行数据,这里封装的是:
public void multiSet(String tableName, byte[] hashKey, List<Pair<byte[], byte[]>> values, int ttl_seconds) throws PException;
支持数据过期时间设定
def multiSet[H, S, V](table: String, hashKey: H, values: Seq[(S, V)], ttl: Duration = 0 second)
table:表名,通常为
String
类型
hashKey:通常为String
类型
value:写入值列表,由sortkey、value组成,如Seq(("hashKey1","sortKey1"),("hashKey2","sortKey2"))
ttl:写入值保留时间,默认为0,表示永久保留
return:无返回值batchMultitSet
对multiSet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchMultiSet
def batchMultitSet[H, S, V](table: String, items: Seq[HashKeyData[H, S, V]], ttl: Duration = 0 second)
table:表名,通常为
String
类型
items:批量写入数据列表
ttl:写入值保留时间,默认为0,表示永久保留
return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)batchMultitSet2
对multiSet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiSet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchMultitSet2
def batchMultiSet2[H, S, V](table: String, items: Seq[HashKeyData[H, S, V]], ttl: Duration = 0 second)
table:表名,通常为
String
类型
items:批量写入数据列表
ttl:写入值保留时间,默认为0,表示永久保留
return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)del
删除单行数据,参见Java客户端文档#del
def del[H, S](table: String, hashKey: H, sortKey: S)
table:表名,通常为
String
类型
hashKey:通常为String
类型
sortkey:通常为String
类型
return:无返回值batchDel
删除一批数据,对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchDel
batchDel[H, S](table: String, keys: Seq[PegasusKey[H, S]])
table:表名,通常为
String
类型
keys:键值列表,由hashKey和sortKey组成
return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)batchDel2
对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchDel不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchDel2
def batchDel2[H, S](table: String, keys: Seq[PegasusKey[H, S]])
multiDel
删同一HashKey下的多行数据,参见Java客户端文档#multiDel
def multiDel[H, S](table: String, hashKey: H, sortKeys: Seq[S])
table:表名,通常为
String
类型
hashKey:通常为String
类型
sortKeys:sortKey列表
return:无返回值batchMultiDel
对multiDel函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchMultiDel
def batchMultiDel[H, S](table: String, keys: Seq[(H, Seq[S])])
table:表名,通常为
String
类型
keys:键列表,由hashKey、sortKeys组成,如Seq(("hashKey1",(sortKey1,sortKey2),("hashKey2",(sortKey3,sortKey4))
return:无返回值batchMultiDel2
对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiDel不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchMultiDel2
def batchMultiDel2[H, S](table: String, keys: Seq[(H, Seq[S])])
table:表名,通常为
String
类型
keys:键列表,由hashKey、sortKeys组成,如Seq(("hashKey1",(sortKey1,sortKey2),("hashKey2",(sortKey3,sortKey4))
return:无返回值ttl
获取单行数据的TTL时间。TTL表示Time To Live,表示该数据还能存活多久。如果超过存活时间,数据就读不到了,参见Java客户端文档#ttl
def ttl[H, S](table: String, hashKey: H, sortKey: S)
table:表名,通常为
String
类型
hashKey:通常为String
类型
sortKeys:通常为String
类型
return:TTL时间,单位为秒。如果该数据没有设置TTL,返回-1;如果该数据不存在,返回-2incr
单行原子增(减)操作,详细说明参见单行原子操作,该操作先将key所指向的value的字节串转换为int64类型(实现上类似于Java的Long.parseLong()函数),然后加上increment,将结果转换为字节串设置为新值。当参数increment为正数时,即原子加;当参数increment为负数时,即原子减,参见Java客户端文档#incr。
def incr[H, S](table: String, hashKey: H, sortKey: S, increment: Long, ttl: Duration = 0 milli)
table:表名,通常为
String
类型
hashKey:通常为String
类型
sortKey:通常为String
类型
increment:增加值
ttl:值保留时间,默认为0,表示永久保留
return:操作成功后的新值ScalaPegasusTable接口
ScalaPegasusTable接口提供的方法均为同步API,
ScalaPegasusClient
接口即默认封装该接口,详细API信息参见ScalaPegasusClient接口ScalaPegasusAsyncTable
ScalaPegasusAsyncTable接口提供的方法均为异步API,封装了java client的异步接口。对应API功能可参考ScalaPegasusClient接口和Java客户端文档#PegasusTableInterface接口,接口封装形式如:
@throws[PException] def multiGet[H, S](hashKey: H, sortKeys: Seq[S], maxFetchCount: Int = 100, maxFetchSize: Int = 1000000, timeout: Duration = 0 milli) (implicit hSer: SER[H], sSer: SER[S]): Future[MultiGetResult[S, Array[Byte]]] = { val result = table.asyncMultiGet(hashKey, sortKeys, maxFetchCount, maxFetchSize, timeout) toScala(result)(convertMultiGetResult[S]) }
其中
table.asyncMultiGet(hashKey, sortKeys, maxFetchCount, maxFetchSize, timeout)
即Java client接口,参数传递原理参见实现原理,toScala(result)(convertMultiGetResult[S])
的完整形式如下:implicit private [scalaclient] def toScala[A, B](future: NFuture[A])(implicit f: A => B): Future[B] = { val promise = Promise[B]() future.addListener(new GenericFutureListener[NFuture[_ >: A]] { override def operationComplete(future: NFuture[_ >: A]): Unit = { if (future.isSuccess) { promise.success(f(future.get.asInstanceOf[A])) } else { promise.failure(future.cause()) } } }) promise.future }
使用
隐式转换
实现Java的异步编程到Scala的异步编程变换,该函数利用io.netty.util.concurrent.{GenericFutureListener, Future => NFuture}
实现异步编程。