Scala Client

Getting the Client

Project repository: Pegasus Scala Client

Download:

git clone git@github.com:apache/incubator-pegasus.git
cd incubator-pegasus/scala-client

Choose the version to use and build. It is recommended to use the master version. Note that the Scala client depends on the Java client. Please refer to Getting the Java Client to add the Java dependency to your project. You can package it into a Jar for use:

sbt package

Alternatively, install to the local sbt repository for convenient use in sbt projects:

sbt publish-local

Or install to the local Maven repository:

sbt publish-m2

By default, the project is built with Scala 2.11. When publishing, both 2.11 (pegasus-scala-client_2.11) and 2.12 (pegasus-scala-client_2.12) artifacts are released. If your project is built with sbt, you can configure:

// Using sbt repository, no suffix needed. It uses the current Scala version, i.e., 2.12
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
    "com.xiaomi.infra" %% "pegasus-scala-client" % "1.11.4-1-SNAPSHOT"
)

Or configure:

// Using a Maven repository (you can add custom Maven repos with resolvers ++= Seq()), suffix required
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
    "com.xiaomi.infra" % "pegasus-scala-client_2.11" % "1.11.4-1-SNAPSHOT"
)

If your project is built with Maven, add the dependency like:

<dependency>
    <groupId>com.xiaomi.infra</groupId>
    <artifactId>pegasus-scala-client_2.11</artifactId>
    <version>1.11.4-1</version>
</dependency>

Using the Client

Get an Instance

Obtain an instance by specifying server configuration. Scala provides two ways to get an instance:

1) Use file path as configuration parameter. Refer to Java client file-based configuration

def createClient(configPath: String): ScalaPegasusClient

Example:

val pegasusClient = ScalaPegasusClientFactory.createClient("resource:///pegasus.properties")

2) Use a Properties object as configuration:

def createClient(props: Properties): ScalaPegasusClient

Example:

Properties pegasusConfig = new Properties();
pegasusConfig.setProperty("meta_servers", "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603");
pegasusConfig.setProperty("operation_timeout", 100);
val pegasusClient = ScalaPegasusClientFactory.createClient(pegasusConfig)

Data Operations

Note: Before calling functions, ensure you import Serializers._. See Implementation Details

val hashKey = 12345L
pegasusClient.set(table, hashKey, "sort_1", "value_1")
val value = pegasusClient.get(table, hashKey, "sort_1").as[String]
pegasusClient.del(table, hashKey, "sort_1")
pegasusClient.exists(table, hashKey, "sort_1") 
pegasusClient.sortKeyCount(table, hashKey)
pegasusClient.close

Interface Definition

The Scala client classes are located in com.xiaomi.infra.pegasus.scalaclient, mainly including the following four classes:

Class Name Functionality
ScalaPegasusClientFactory Client factory class for creating Client instances
ScalaPegasusClient Client class that encapsulates various synchronous APIs, and can also be used to create Table instances
ScalaPegasusTable Table class that encapsulates synchronous APIs for operating on a single table
ScalaPegasusAsyncTable Table class that encapsulates asynchronous APIs for operating on a single table

Users can choose to use the Client class (ScalaPegasusClient) or the Table classes (ScalaPegasusTable or ScalaPegasusAsyncTable) for data access. Differences:

  • The Client class specifies the table name directly in parameters, avoiding the need to open a table, making it simpler to use.
  • The Table classes support both synchronous and asynchronous APIs, while the Client class supports only synchronous APIs.
  • The Table classes allow individual timeout settings for each operation, whereas the Client class cannot specify timeouts individually and uses the default timeout in the configuration file.
  • The Table classes provide more accurate timeout behavior. The Client class may need to initialize the Table object internally on the first read/write request, causing the initial timeout to be less accurate.

ScalaPegasusClient Interface

Implementation Details

The ScalaPegasusClient interface accesses specific tables by holding a ScalaPegasusTable, which wraps the Java client interface PegasusTableInterface. The function form is as follows:

def get[H, S](table: String, hashKey: H, sortKey: S)(implicit hSer: SER[H], sSer: SER[S]) = {
    getTable(table).get(hashKey, sortKey)
}

Each table operation function is defined as a generic function. The parameter list (table: String, hashKey: H, sortKey: S) is the actual passed parameters, and implicit parameters (implicit hSer: SER[H], sSer: SER[S]) perform generic conversion of (table: String, hashKey: H, sortKey: S). SER[H] is a generic declaration in the Serializers class, which contains implicit conversion functions for different generic objects (converting into byte[] parameters in the Java client’s PegasusTableInterface, corresponding to Array[Byte] in Scala). The example shows the implicit conversion when the generic type is defined as String:

implicit object Utf8String extends Serializer[String] {
    override def serialize(obj: String): Array[Byte] = if (obj == null) null else obj.getBytes("UTF-8")
    override def deserialize(bytes: Array[Byte]): String = if (bytes == null) null else  new String(bytes, "UTF-8")
}

When calling methods provided by ScalaPegasusClient, if the generic parameter in the first parameter list is a String, it is automatically converted to Array[Byte] and passed to the corresponding method of PegasusTableInterface. Ensure Serializers._ is included; otherwise, parameter type conversion cannot be completed. You can use:

import com.xiaomi.infra.pegasus.scalaclient.Serializers._

to import dependencies. Currently supported automatic conversions include String, Boolean, Int, Long, Short, Double. These types can be automatically converted to Array[Byte].

API Functions

exists

Check whether a key exists. See Java Client Doc: exist

def exists[H, S](table: String, hashKey: H, sortKey: S)

table: table name, usually String hashKey: usually String sortKey: usually String return: whether it exists, boolean

sortKeyCount

Get the number of sort keys under a hash key. See Java Client Doc: sortKeyCount

def sortKeyCount[H](table: String, hashKey: H)

table: table name, usually String hashKey: usually String return: count of sortKeys, long

get

Get a single value. See Java Client Doc: get

def get[H, S](table: String, hashKey: H, sortKey: S)

table: String hashKey: String sortKey: String return: value, Array[Byte]. You can convert with as[String] to String

batchGet

Read a batch of values. It concurrently sends async requests to server and waits for results. If any request fails, it terminates early and throws an exception. If an exception is thrown, values in the result are undefined. See Java Client Doc: batchGet

def batchGet[H, S](table: String, keys: List[PegasusKey[H, S]])

table: String keys: list of PegasusKey, composed of hashKey and sortKey return: list of values, PegasusResultList

batchGet2

Read a batch of values, similar to batchGet. It concurrently sends async requests to server, but unlike the above, it waits for all requests to finish regardless of success or failure. See Java Client Doc: batchGet2

def batchGet2[H, S](table: String, keys: Seq[PegasusKey[H, S]])

table: String keys: list of PegasusKey, composed of hashKey and sortKey return: list of values, PegasusResultList

multiGet

The Java client includes multiple multiGet interfaces for reading multiple rows under the same hashKey. This wraps the following:

public boolean multiGet(String tableName, byte[] hashKey, 
                        List<byte[]> sortKeys, int maxFetchCount, 
                        int maxFetchSize, List<Pair<byte[], byte[]>> values) throws PException;

Supports parameters maxFetchCount and maxFetchSize. See Java Client Doc: multiGet

def multiGet[H, S](table: String, hashKey: H, sortKeys: Seq[S],
            maxFetchCount: Int = 100, maxFetchSize: Int = 1000000)

table: String hashKey: String sortKeys: list of sort keys maxFetchCount: maximum number of entries to fetch, default 100 maxFetchSize: maximum total bytes to fetch, default 1000000 return: list of values, convertMultiGetResult

multiGetRange

The Java client includes multiple multiGet interfaces. This wraps the following:

public boolean multiGet(String tableName, byte[] hashKey,
                    byte[] startSortKey, byte[] stopSortKey, MultiGetOptions options,
                    int maxFetchCount, int maxFetchSize,
                    List<Pair<byte[], byte[]>> values) throws PException;

Supports range query on sort keys and conditional filtering, only reading data that meets the conditions. See Java Client Doc: multiGet

def multiGetRange[H, S](hashKey: H, startSortKey: S, stopSortKey: S, 
                        options: Options.MultiGet,
                        maxFetchCount: Int = 100, maxFetchSize: Int = 1000000, 
                        timeout: Duration = 0 milli)

table: String hashKey: String startSortKey: start of sort key range stopSortKey: end of sort key range options: query options maxFetchCount: default 100 maxFetchSize: default 1000000 timeout: timeout for fetching, default 0 (use value from config file) return: list of values, convertMultiGetResult

batchMultiGet

Batch wrapper for multiGet. It concurrently sends async requests to server and waits for results. If any request fails, it terminates early and throws an exception. If an exception is thrown, values in the result are undefined. See Java Client Doc: batchMultiGet

def batchMultiGet[H, S](keys: Seq[(H, Seq[S])], timeout: Duration = 0 milli)

keys: list of hashKey-sortKeys, e.g., Seq(("1",Seq("1","2")),("1",Seq("1","2"))) timeout: timeout for fetching, default 0 (use value from config file) return: list of values, List

batchMultiGet2

Batch wrapper for multiGet. It concurrently sends async requests to server and waits for results, but unlike the above, it waits for all requests to finish regardless of success or failure. See Java Client Doc: batchMultiGet2

def batchMultiGet2[H, S](keys: Seq[(H, Seq[S])], timeout: Duration = 0 milli)

keys: list of hashKey-sortKeys, e.g., Seq(("1",Seq("1","2")),("1",Seq("1","2"))) timeout: default 0 return: list of values, List

set

Write a single row

def set[H, S, V](hashKey: H, sortKey: S, value: V, ttl: Duration = 0 second, timeout: Duration = 0 milli)

hashKey: usually String sortKey: usually String value: value to write, usually String ttl: time-to-live, default 0 (permanent) timeout: default 0 (use config file value) return: none

batchSet

Write a batch of entries, a batch wrapper for set. It concurrently sends async requests to server and waits for results. If any request fails, it terminates early and throws an exception. See Java Client Doc: batchSet

def batchSet[H, S, V](table: String, items: Seq[SetItem[H, S, V]])

table: String items: list composed of hashKey, sortKey, value return: number of successful requests (not atomic; partial success is possible. You may choose to use only successful results)

batchSet2

Batch wrapper for set. It concurrently sends async requests to server and waits for results, but unlike above, it waits for all requests to finish regardless of success or failure. See Java Client Doc: batchSet2

multiSet

The Java client provides two interfaces for writing multiple rows under the same hashKey. This wraps:

public void multiSet(String tableName, byte[] hashKey, 
                    List<Pair<byte[], byte[]>> values, 
                    int ttl_seconds) throws PException;

Supports TTL for entries

def multiSet[H, S, V](table: String, hashKey: H, values: Seq[(S, V)], ttl: Duration = 0 second)

table: String hashKey: String value: list of values composed of sortKey and value, e.g., Seq(("hashKey1","sortKey1"),("hashKey2","sortKey2")) ttl: default 0 (permanent) return: none

batchMultitSet

Batch wrapper for multiSet. It concurrently sends async requests to server and waits for results. If any request fails, it terminates early and throws an exception. See Java Client Doc: batchMultiSet

def batchMultitSet[H, S, V](table: String, items: Seq[HashKeyData[H, S, V]], ttl: Duration = 0 second)

table: String items: batch data to write ttl: default 0 (permanent) return: number of successful requests (not atomic; partial success is possible)

batchMultitSet2

Batch wrapper for multiSet. It concurrently sends async requests to server and waits for results, but unlike above, it waits for all requests to finish regardless of success or failure. See Java Client Doc: batchMultitSet2

def batchMultiSet2[H, S, V](table: String, items: Seq[HashKeyData[H, S, V]], ttl: Duration = 0 second)

table: String items: batch data to write ttl: default 0 return: number of successful requests (not atomic; partial success is possible)

del

Delete a single row. See Java Client Doc: del

def del[H, S](table: String, hashKey: H, sortKey: S)

table: String hashKey: String sortKey: String return: none

batchDel

Delete a batch of rows, batch wrapper for del. It concurrently sends async requests to server and waits for results. If any request fails, it terminates early and throws an exception. See Java Client Doc: batchDel

batchDel[H, S](table: String, keys: Seq[PegasusKey[H, S]])

table: String keys: list of keys composed of hashKey and sortKey return: number of successful requests (not atomic; partial success is possible)

batchDel2

Batch wrapper for del. It concurrently sends async requests to server and waits for results, but unlike above, it waits for all requests to finish regardless of success or failure. See Java Client Doc: batchDel2

def batchDel2[H, S](table: String, keys: Seq[PegasusKey[H, S]])

multiDel

Delete multiple rows under the same hashKey. See Java Client Doc: multiDel

def multiDel[H, S](table: String, hashKey: H, sortKeys: Seq[S])

table: String hashKey: String sortKeys: list of sort keys return: none

batchMultiDel

Batch wrapper for multiDel. It concurrently sends async requests to server and waits for results. If any request fails, it terminates early and throws an exception. See Java Client Doc: batchMultiDel

def batchMultiDel[H, S](table: String, keys: Seq[(H, Seq[S])])

table: String keys: list composed of hashKey and sortKeys, e.g., Seq(("hashKey1",(sortKey1,sortKey2)),("hashKey2",(sortKey3,sortKey4))) return: none

batchMultiDel2

Batch wrapper for del. It concurrently sends async requests to server and waits for results, but unlike above, it waits for all requests to finish regardless of success or failure. See Java Client Doc: batchMultiDel2

def batchMultiDel2[H, S](table: String, keys: Seq[(H, Seq[S])])

table: String keys: list composed of hashKey and sortKeys, e.g., Seq(("hashKey1",(sortKey1,sortKey2)),("hashKey2",(sortKey3,sortKey4))) return: none

ttl

Get the TTL of a single row. TTL (Time To Live) indicates how long the data will still be alive. If the TTL expires, the data cannot be read. See Java Client Doc: ttl

def ttl[H, S](table: String, hashKey: H, sortKey: S)

table: String hashKey: String sortKeys: String return: TTL in seconds. If no TTL is set, returns -1; if the data does not exist, returns -2

incr

Single-row atomic increment/decrement. See Single Row Atomic Operations for details. The operation first converts the value bytes pointed by the key to int64 (similar to Java’s Long.parseLong()), then adds increment, converts the result to bytes and sets it as the new value. If increment is positive, it increments; if negative, it decrements. See Java Client Doc: incr.

def incr[H, S](table: String, hashKey: H, sortKey: S, increment: Long, ttl: Duration = 0 milli)

table: String hashKey: String sortKey: String increment: amount to add ttl: default 0 (permanent) return: the new value after success

ScalaPegasusTable Interface

ScalaPegasusTable provides synchronous APIs. The ScalaPegasusClient interface wraps these by default. See ScalaPegasusClient Interface for details.

ScalaPegasusAsyncTable

ScalaPegasusAsyncTable provides asynchronous APIs, wrapping the Java client’s async interfaces. Refer to ScalaPegasusClient Interface and Java Client Doc: PegasusTableInterface. Example wrapper:

@throws[PException]
    def multiGet[H, S](hashKey: H, sortKeys: Seq[S], maxFetchCount: Int = 100, maxFetchSize: Int = 1000000, timeout: Duration = 0 milli)
            (implicit hSer: SER[H], sSer: SER[S]): Future[MultiGetResult[S, Array[Byte]]] = {
        val result = table.asyncMultiGet(hashKey, sortKeys, maxFetchCount, maxFetchSize, timeout)
        toScala(result)(convertMultiGetResult[S])
    }

Where table.asyncMultiGet(hashKey, sortKeys, maxFetchCount, maxFetchSize, timeout) is the Java client interface. See Implementation Details for parameter passing principles. The full form of toScala(result)(convertMultiGetResult[S]) is:

implicit private [scalaclient] def toScala[A, B](future: NFuture[A])(implicit f: A => B): Future[B] = {
        val promise = Promise[B]()
        future.addListener(new GenericFutureListener[NFuture[_ >: A]] {
            override def operationComplete(future: NFuture[_ >: A]): Unit = {
                if (future.isSuccess) {
                    promise.success(f(future.get.asInstanceOf[A]))
                } else {
                    promise.failure(future.cause())
                }
            }
        })
        promise.future
    }

This uses implicit conversion to transform Java async programming to Scala async programming, utilizing io.netty.util.concurrent.{GenericFutureListener, Future => NFuture}.

Copyright © 2023 The Apache Software Foundation. Licensed under the Apache License, Version 2.0.

Apache Pegasus is an effort undergoing incubation at The Apache Software Foundation (ASF), sponsored by the Apache Incubator. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.

Apache Pegasus, Pegasus, Apache, the Apache feather logo, and the Apache Pegasus project logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.