Scala 客户端

获取客户端

项目地址:Pegasus scala client 下载:

git clone git@github.com:apache/incubator-pegasus.git
cd incubator-pegasus/scala-client

选择所使用的版本并构建,建议使用master版本。同时注意,scala客户端构建依赖Java客户端,请参考获取Java客户端在项目中添加Java依赖。你可以打包成Jar包进行使用:

sbt package

或者,安装到本地的sbt repository,方便在sbt项目中使用:

sbt publish-local

或者,安装到本地的maven repository:

sbt publish-m2

项目默认使用scala-2.11进行构建,打包发布时则同时发布2.11版本(pegasus-scala-client_2.11)和2.12版本(pegasus-scala-client_2.12),如果你的项目使用sbt构建,则可配置为:

//使用sbt仓库,不需要添加后缀,默认使用当前scala版本号,即使用2.12
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
    "com.xiaomi.infra" %% "pegasus-scala-client" % "1.11.4-1-SNAPSHOT"
)

或者配置为:

//使用maven仓库(你可以使用resolvers ++= Seq()添加自定义maven仓库),需要添加后缀
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
    "com.xiaomi.infra" % "pegasus-scala-client_2.11" % "1.11.4-1-SNAPSHOT"
)

如果你的项目通过maven构建,则可通过maven配置在项目中使用,例如:

<dependency>
    <groupId>com.xiaomi.infra</groupId>
    <artifactId>pegasus-scala-client_2.11</artifactId>
    <version>1.11.4-1</version>
</dependency>

使用客户端

获取实例

通过指定server配置信息获取实例,Scala提供两种获取实例的接口:
1、文件路径作为配置参数: 参见Java客户端文件配置

def createClient(configPath: String): ScalaPegasusClient

例如:

val pegasusClient = ScalaPegasusClientFactory.createClient("resource:///pegasus.properties")

2、Properties对象作为配置:

def createClient(props: Properties): ScalaPegasusClient

例如:

Properties pegasusConfig = new Properties();
pegasusConfig.setProperty("meta_servers", "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603");
pegasusConfig.setProperty("operation_timeout", 100);
val pegasusClient = ScalaPegasusClientFactory.createClient(pegasusConfig)

数据操作

注意:调用函数前请确认导入Serializers._,详情参阅实现原理

val hashKey = 12345L
pegasusClient.set(table, hashKey, "sort_1", "value_1")
val value = pegasusClient.get(table, hashKey, "sort_1").as[String]
pegasusClient.del(table, hashKey, "sort_1")
pegasusClient.exists(table, hashKey, "sort_1") 
pegasusClient.sortKeyCount(table, hashKey)
pegasusClient.close

接口定义

scala的客户端类地址为:com.xiaomi.infra.pegasus.scalaclient,主要包括以下四个类:

类名 功能
ScalaPegasusClientFactory Client工厂类,用于创建Client实例
ScalaPegasusClient Client类,封装了各种同步API,也可用于创建Table实例
ScalaPegasusTable Table类,封装了操作单个Table数据的同步API
ScalaPegasusAsyncTable Table类,封装了操作单个Table数据的异步API

用户可以选择使用Client类(ScalaPegasusClient)或者是Table类(ScalaPegasusTable或者ScalaPegasusAsyncTable)存取数据,区别如下:

  • Client类直接在参数中指定表名,省去了打开表的动作,使用更便捷。
  • Table类同时支持同步和异步API,而Client类只支持同步API
  • Table类可以为每个操作设置单独的超时,而Client类无法单独指定超时,只能使用配置文件中的默认超时。
  • Table类的超时更准确,而Client类在首次读写请求时可能需要在内部初始化Table对象,所以首次读写的超时可能不太准确。

    ScalaPegasusClient接口

    实现原理

    ScalaPegasusClient接口通过持有ScalaPegasusTable实现对特定表的访问,而ScalaPegasusTable实际是封装了Java client的接口PegasusTableInterface而实现的。函数形式如下所示:

    def get[H, S](table: String, hashKey: H, sortKey: S)(implicit hSer: SER[H], sSer: SER[S]) = {
    getTable(table).get(hashKey, sortKey)
    }
    

    每一个数据表的操作函数都被定义为泛型函数,参数列表(table: String, hashKey: H, sortKey: S)是实际传入的参数,同时使用隐式参数(implicit hSer: SER[H], sSer: SER[S])完成对参数列表(table: String, hashKey: H, sortKey: S)泛型的转换。其中SER[H]是类Serializers的泛型声明,该类包含对不同泛型对象的隐式转换函数(转换成Java client中PegasusTableInterfacebyte[]参数,在scala中对应为Array[Byte],例子展示的是当泛型在使用的时候被定义为String时的隐式转换函数:

    implicit object Utf8String extends Serializer[String] {
    override def serialize(obj: String): Array[Byte] = if (obj == null) null else obj.getBytes("UTF-8")
    override def deserialize(bytes: Array[Byte]): String = if (bytes == null) null else  new String(bytes, "UTF-8")
    }
    

    客户端在调用ScalaPegasusClient提供的方法时,当对第一个参数列表的泛型参数传入String类型变量的时候,将被自动转换为Array[Byte]类型变量,并传入PegasusTableInterface的对应方法中。请确保包含Serializers._,否则无法完成参数的类型转换,你可以使用:

    import com.xiaomi.infra.pegasus.scalaclient.Serializers._
    

    导入依赖,目前接受的自动类型转换包括StringBooleanIntLongShortDouble,这些类型可自动转换为Array[Byte]

    API功能

    exists

    判断key是否存在,参见Java客户端文档#exist

    def exists[H, S](table: String, hashKey: H, sortKey: S)
    

    table:表名,通常为String类型
    hashKey:通常为String类型
    sortKey:通常为String类型
    return: 返回是否存在,boolean类型

    sortKeyCount

    获取一个hashkey下的sortkey值,参见Java客户端文档#sortKeyCount

    def sortKeyCount[H](table: String, hashKey: H)
    

    table:表名,通常为String类型
    hashKey:通常为String类型
    return:返回sortKeys个数,long类型

    get

    获取一条数据,参见Java客户端文档#get

    def get[H, S](table: String, hashKey: H, sortKey: S)
    

    table:表名,通常为String类型
    hashKey:通常为String类型
    sortKey:通常为String类型
    return:返回获取值,Array[Byte]类型,你可以使用as[String]转换为String类型

    batchGet

    读取一批数据,对get函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。如果抛出了异常,则values中的结果是未定义的,参见Java客户端文档#batchGet

    def batchGet[H, S](table: String, keys: List[PegasusKey[H, S]])
    

    table:表名,通常为String类型
    keys:PegasusKey列表,由hashKey和SortKey组成
    return:返回获取值列表,PegasusResultList类型

    batchGet2

    读取一批数据,对get函数的批量封装。该函数并发地向server发送异步请求,但与上面batchGet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchGet2

    def batchGet2[H, S](table: String, keys: Seq[PegasusKey[H, S]])
    

    table:表名,通常为String类型
    keys:PegasusKey列表,有hashKey和SortKey组成
    return:返回获取值列表,PegasusResultList类型

    multiGet

    Java client包含多种multiGet接口,提供读同一HashKey下的多行数据功能,这里封装的是:

    public boolean multiGet(String tableName, byte[] hashKey, 
                        List<byte[]> sortKeys, int maxFetchCount, 
                        int maxFetchSize, List<Pair<byte[], byte[]>> values) throws PException;
    

    支持最大数据量maxFetchCount和最大数据大小maxFetchSize的参数设置,参见Java客户端文档#multiGet

    def multiGet[H, S](table: String, hashKey: H, sortKeys: Seq[S],
            maxFetchCount: Int = 100, maxFetchSize: Int = 1000000)
    

    table:表名,通常为String类型
    hashKey:通常为String类型
    sortKeys:sortKey列表
    maxFetchCount:最大获取数据量,这里默认为100
    maxFetchSize:最大获取数据值大小,这里默认为1000000字节
    return:返回获取值列表,convertMultiGetResult类型

    multiGetRange

    Java client包含多种multiGet接口,提供读同一HashKey下的多行数据功能,这里封装的是:

    public boolean multiGet(String tableName, byte[] hashKey,
                    byte[] startSortKey, byte[] stopSortKey, MultiGetOptions options,
                    int maxFetchCount, int maxFetchSize,
                    List<Pair<byte[], byte[]>> values) throws PException;
    

    可以支持SortKey的范围查询和条件过滤,只读取满足特定条件的数据,参见Java客户端文档#multiGet

    def multiGetRange[H, S](hashKey: H, startSortKey: S, stopSortKey: S, 
                        options: Options.MultiGet,
                        maxFetchCount: Int = 100, maxFetchSize: Int = 1000000, 
                        timeout: Duration = 0 milli)
    

    table:表名,通常为String类型
    hashKey:hashKey,通常为String类型
    startSortKey:sortKey范围的起始值
    stopSortKey:sortKey范围的终止值
    options:查询条件
    maxFetchCount:最大数据量,默认为100
    maxFetchSize:最大数据值大小,默认为1000000字节
    timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
    return:返回获取值列表,convertMultiGetResult类型

    batchMultiGet

    对multiGet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。如果抛出了异常,则values中的结果是未定义的,参见Java客户端文档#batchMultiGet

    def batchMultiGet[H, S](keys: Seq[(H, Seq[S])], timeout: Duration = 0 milli)
    

    keys:hashKey-sortKeys列表,如:Seq(("1",Seq("1","2")),("1",Seq("1","2")))
    timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
    return:返回获取值列表,List类型

    batchMultiGet2

    对multiGet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiGet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchMultiGet2

    def batchMultiGet2[H, S](keys: Seq[(H, Seq[S])], timeout: Duration = 0 milli)
    

    keys:hashKey-sortKeys列表,如:Seq(("1",Seq("1","2")),("1",Seq("1","2")))
    timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
    return:返回获取值列表,List类型

    set

    写单行数据

    def set[H, S, V](hashKey: H, sortKey: S, value: V, ttl: Duration = 0 second, timeout: Duration = 0 milli)
    

    hashKey:通常为String类型
    sortKey:通常为String类型
    value:对应key的写入值,通常为String类型
    ttl:写入值保留时间,默认为0,表示永久保留
    timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
    return:无返回值

    batchSet

    写一批数据,对set函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchSet

    def batchSet[H, S, V](table: String, items: Seq[SetItem[H, S, V]])
    

    table:表名,通常为String类型
    items:写入值列表,由hashKey、sortKey、value组成
    return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)

    batchSet2

    对set函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchSet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchSet2

    multiSet

    Java client有两种接口,提供写同一HashKey下的多行数据,这里封装的是:

    public void multiSet(String tableName, byte[] hashKey, 
                    List<Pair<byte[], byte[]>> values, 
                    int ttl_seconds) throws PException;
    

    支持数据过期时间设定

    def multiSet[H, S, V](table: String, hashKey: H, values: Seq[(S, V)], ttl: Duration = 0 second)
    

    table:表名,通常为String类型
    hashKey:通常为String类型
    value:写入值列表,由sortkey、value组成,如Seq(("hashKey1","sortKey1"),("hashKey2","sortKey2"))
    ttl:写入值保留时间,默认为0,表示永久保留
    return:无返回值

    batchMultitSet

    对multiSet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchMultiSet

    def batchMultitSet[H, S, V](table: String, items: Seq[HashKeyData[H, S, V]], ttl: Duration = 0 second)
    

    table:表名,通常为String类型
    items:批量写入数据列表
    ttl:写入值保留时间,默认为0,表示永久保留
    return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)

    batchMultitSet2

    对multiSet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiSet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchMultitSet2

    def batchMultiSet2[H, S, V](table: String, items: Seq[HashKeyData[H, S, V]], ttl: Duration = 0 second)
    

    table:表名,通常为String类型
    items:批量写入数据列表
    ttl:写入值保留时间,默认为0,表示永久保留
    return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)

    del

    删除单行数据,参见Java客户端文档#del

    def del[H, S](table: String, hashKey: H, sortKey: S)
    

    table:表名,通常为String类型
    hashKey:通常为String类型
    sortkey:通常为String类型
    return:无返回值

    batchDel

    删除一批数据,对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchDel

    batchDel[H, S](table: String, keys: Seq[PegasusKey[H, S]])
    

    table:表名,通常为String类型
    keys:键值列表,由hashKey和sortKey组成
    return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)

    batchDel2

    对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchDel不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchDel2

    def batchDel2[H, S](table: String, keys: Seq[PegasusKey[H, S]])
    

    multiDel

    删同一HashKey下的多行数据,参见Java客户端文档#multiDel

    def multiDel[H, S](table: String, hashKey: H, sortKeys: Seq[S])
    

    table:表名,通常为String类型
    hashKey:通常为String类型
    sortKeys:sortKey列表
    return:无返回值

    batchMultiDel

    对multiDel函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchMultiDel

    def batchMultiDel[H, S](table: String, keys: Seq[(H, Seq[S])])
    

    table:表名,通常为String类型
    keys:键列表,由hashKey、sortKeys组成,如Seq(("hashKey1",(sortKey1,sortKey2),("hashKey2",(sortKey3,sortKey4))
    return:无返回值

    batchMultiDel2

    对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiDel不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchMultiDel2

    def batchMultiDel2[H, S](table: String, keys: Seq[(H, Seq[S])])
    

    table:表名,通常为String类型
    keys:键列表,由hashKey、sortKeys组成,如Seq(("hashKey1",(sortKey1,sortKey2),("hashKey2",(sortKey3,sortKey4))
    return:无返回值

    ttl

    获取单行数据的TTL时间。TTL表示Time To Live,表示该数据还能存活多久。如果超过存活时间,数据就读不到了,参见Java客户端文档#ttl

    def ttl[H, S](table: String, hashKey: H, sortKey: S)
    

    table:表名,通常为String类型
    hashKey:通常为String类型
    sortKeys:通常为String类型
    return:TTL时间,单位为秒。如果该数据没有设置TTL,返回-1;如果该数据不存在,返回-2

    incr

    单行原子增(减)操作,详细说明参见单行原子操作,该操作先将key所指向的value的字节串转换为int64类型(实现上类似于Java的Long.parseLong()函数),然后加上increment,将结果转换为字节串设置为新值。当参数increment为正数时,即原子加;当参数increment为负数时,即原子减,参见Java客户端文档#incr

    def incr[H, S](table: String, hashKey: H, sortKey: S, increment: Long, ttl: Duration = 0 milli)
    

    table:表名,通常为String类型
    hashKey:通常为String类型
    sortKey:通常为String类型
    increment:增加值
    ttl:值保留时间,默认为0,表示永久保留
    return:操作成功后的新值

    ScalaPegasusTable接口

    ScalaPegasusTable接口提供的方法均为同步API,ScalaPegasusClient接口即默认封装该接口,详细API信息参见ScalaPegasusClient接口

    ScalaPegasusAsyncTable

    ScalaPegasusAsyncTable接口提供的方法均为异步API,封装了java client的异步接口。对应API功能可参考ScalaPegasusClient接口Java客户端文档#PegasusTableInterface接口,接口封装形式如:

    @throws[PException]
    def multiGet[H, S](hashKey: H, sortKeys: Seq[S], maxFetchCount: Int = 100, maxFetchSize: Int = 1000000, timeout: Duration = 0 milli)
            (implicit hSer: SER[H], sSer: SER[S]): Future[MultiGetResult[S, Array[Byte]]] = {
        val result = table.asyncMultiGet(hashKey, sortKeys, maxFetchCount, maxFetchSize, timeout)
        toScala(result)(convertMultiGetResult[S])
    }
    

    其中table.asyncMultiGet(hashKey, sortKeys, maxFetchCount, maxFetchSize, timeout)即Java client接口,参数传递原理参见实现原理toScala(result)(convertMultiGetResult[S])的完整形式如下:

    implicit private [scalaclient] def toScala[A, B](future: NFuture[A])(implicit f: A => B): Future[B] = {
        val promise = Promise[B]()
        future.addListener(new GenericFutureListener[NFuture[_ >: A]] {
            override def operationComplete(future: NFuture[_ >: A]): Unit = {
                if (future.isSuccess) {
                    promise.success(f(future.get.asInstanceOf[A]))
                } else {
                    promise.failure(future.cause())
                }
            }
        })
        promise.future
    }
    

    使用隐式转换实现Java的异步编程到Scala的异步编程变换,该函数利用io.netty.util.concurrent.{GenericFutureListener, Future => NFuture}实现异步编程。

Copyright © 2023 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.

Apache Pegasus is an effort undergoing incubation at The Apache Software Foundation (ASF), sponsored by the Apache Incubator. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.

Apache Pegasus, Pegasus, Apache, the Apache feather logo, and the Apache Pegasus project logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.