获取Cpp客户端
首先需要编译Pegasus,编译完成后运行以下命令可以打包生产Cpp客户端库:
./run.sh pack_client
运行成功后,会在本地文件夹下生产pegasus-client-{version}-{platform}-{buildType}的文件夹以及tar.gz文件。在文件夹里面有个sample/文件夹,进去后可以运行make编译示例程序。
配置文件
Cpp客户端由于使用了rdsn框架,所以配置文件较为复杂,如下:
[apps..default]
run = true
count = 1
;network.client.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536
;network.client.RPC_CHANNEL_UDP = dsn::tools::sim_network_provider, 65536
;network.server.0.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536
[apps.mimic]
type = dsn.app.mimic
arguments =
pools = THREAD_POOL_DEFAULT
run = true
count = 1
[core]
;tool = simulator
;tool = fastrun
tool = nativerun
;toollets = tracer
;toollets = tracer, profiler, fault_injector
pause_on_start = false
cli_local = false
cli_remote = false
start_nfs = false
logging_start_level = LOG_LEVEL_DEBUG
logging_factory_name = dsn::tools::simple_logger
logging_flush_on_exit = true
enable_default_app_mimic = true
data_dir = ./data
[tools.simple_logger]
short_header = true
fast_flush = true
max_number_of_log_files_on_disk = 10
stderr_start_level = LOG_LEVEL_ERROR
[tools.hpc_logger]
per_thread_buffer_bytes = 8192
max_number_of_log_files_on_disk = 10
[tools.simulator]
random_seed = 0
[network]
; how many network threads for network library(used by asio)
io_service_worker_count = 4
; specification for each thread pool
[threadpool..default]
worker_count = 4
[threadpool.THREAD_POOL_DEFAULT]
name = default
partitioned = false
max_input_queue_length = 1024
worker_priority = THREAD_xPRIORITY_NORMAL
worker_count = 4
[task..default]
is_trace = false
is_profile = false
allow_inline = false
fast_execution_in_network_thread = false
rpc_call_header_format = NET_HDR_DSN
rpc_call_channel = RPC_CHANNEL_TCP
rpc_timeout_milliseconds = 5000
[pegasus.clusters]
onebox = @LOCAL_IP@:34601,@LOCAL_IP@:34602,@LOCAL_IP@:34603
another_cluster = @SOME_IP@:34601,@SOME_IP@:34601,@SOME_IP@:34601
配置文件的具体解释,请移步配置说明。
接口定义
创建Client实例
客户端工厂类
namespace pegasus {
class pegasus_client_factory
{
public:
///
/// \brief initialize
/// initialize pegasus client lib. must call this function before anything else.
/// \param config_file
/// the configuration file of client lib
/// \return
/// true indicate the initailize is success.
///
static bool initialize(const char *config_file);
///
/// \brief get_client
/// get an instance for a given cluster and a given app name.
/// \param cluster_name
/// the pegasus cluster name.
/// a cluster can have multiple apps.
/// \param app_name
/// an app is a logical isolated k-v store.
/// a cluster can have multiple apps.
/// \return
/// the client instance. DO NOT delete this client even after usage.
static pegasus_client *get_client(const char *cluster_name, const char *app_name);
};
} //end namespace
客户端的初始化,实际上是对rdsn框架初始化
if (!pegasus::pegasus_client_factory::initialize("config.ini")) {
fprintf(stderr, "ERROR: init pegasus failed\n");
return -1;
}
/**succeed, continue**/
注:
- 参数:config_file 见 配置文件 的介绍
- 返回值:bool值,true代表初始化成功,false初始化失败
- 该函数只需在一个进程生命周期内调用一次即可,并且非线程安全的
获取客户端
pegasus::pegasus_client* pg_client = pegasus::pegasus_client_factory::get_client("cluster_name", "table_name");
if (pg_client == nullptr) {
fprintf(stderr, "ERROR: get pegasus client failed\n");
return -1;
}
/*** do what you want with pg_client ****/
注意: get_client返回值不能明确的调用delete,也不能用智能指针封装,框架在停止的时候自动释放(底层使用单例来保存)
pegasus_client接口
Cpp客户端提供两种接口,异步API 和 同步API,同步API是基于异步API实现的,具体如下:
set
写单行数据
///
/// \brief set
/// store the k-v to the cluster.
/// key is composed of hashkey and sortkey.
/// \param hashkey
/// used to decide which partition to put this k-v
/// \param sortkey
/// all the k-v under hashkey will be sorted by sortkey.
/// \param value
/// the value we want to store.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \param ttl_seconds
/// time to live of this value, if expired, will return not found; 0 means no ttl
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
///
virtual int set(const std::string &hashkey,
const std::string &sortkey,
const std::string &value,
int timeout_milliseconds = 5000,
int ttl_seconds = 0,
internal_info *info = NULL) = 0;
注:
- internal_info 结构如下,主要是记录在写入成功之后,该条数据的一些信息,在使用之前需要判断info是否为空
struct internal_info { int32_t app_id; int32_t partition_index; int64_t decree; std::string server; internal_info() : app_id(-1), partition_index(-1), decree(-1) {} }
- 返回值:int值来表示是否成功,通过get_error_string() 函数来判断返回值的意义(下面的所有同步接口的返回值,都可以通过该函数判断返回值意义)
/// /// \brief get_error_string /// get error string /// all the function above return an int value that indicates an error can be converted into a /// string for human reading. /// \param error_code /// all the error code are defined in "error_def.h" /// \return /// virtual const char *get_error_string(int error_code) const = 0;
async_set
异步写单行数据
/// /// \brief asynchronous set /// store the k-v to the cluster. /// will not be blocked, return immediately. /// key is composed of hashkey and sortkey. /// \param hashkey /// used to decide which partition to put this k-v /// \param sortkey /// all the k-v under hashkey will be stored by sortkey. /// \param value /// the value we want to store. /// \param callback /// the callback function will be invoked after operation finished or error occurred. /// \param timeout_milliseconds /// if wait longer than this value, will return time out error. /// \param ttl_seconds /// time to live of this value, if expired, will return not found; 0 means no ttl. /// \return /// void. /// virtual void async_set(const std::string &hashkey, const std::string &sortkey, const std::string &value, async_set_callback_t &&callback = nullptr, int timeout_milliseconds = 5000, int ttl_seconds = 0) = 0;
multi_set
写多条数据(同一hashkey下面)
///
/// \brief multi_set (guarantee atomicity)
/// store multiple k-v of the same hashkey to the cluster.
/// \param hashkey
/// used to decide which partition to put this k-v
/// \param kvs
/// all <sortkey,value> pairs to be set. should not be empty
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \param ttl_seconds
/// time to live of this value, if expired, will return not found; 0 means no ttl
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string().
/// return PERR_INVALID_ARGUMENT if param kvs is empty.
///
virtual int multi_set(const std::string &hashkey,
const std::map<std::string, std::string> &kvs,
int timeout_milliseconds = 5000,
int ttl_seconds = 0,
internal_info *info = NULL) = 0;
async_multi_set
异步写多条数据
///
/// \brief asynchronous multi_set (guarantee atomicity)
/// store multiple k-v of the same hashkey to the cluster.
/// will not be blocked, return immediately.
/// \param hashkey
/// used to decide which partition to put this k-v
/// \param kvs
/// all <sortkey,value> pairs to be set. should not be empty
/// \param callback
/// the callback function will be invoked after operation finished or error occurred.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \param ttl_seconds
/// time to live of this value, if expired, will return not found; 0 means no ttl
/// \return
/// void.
///
virtual void async_multi_set(const std::string &hashkey,
const std::map<std::string, std::string> &kvs,
async_multi_set_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000,
int ttl_seconds = 0) = 0;
get
读取一条数据
///
/// \brief get
/// get value by key from the cluster.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param sortkey
/// all the k-v under hashkey will be sorted by sortkey.
/// \param value
/// the returned value will be put into it.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string().
/// returns PERR_NOT_FOUND if no value is found under the <hashkey,sortkey>.
///
virtual int get(const std::string &hashkey,
const std::string &sortkey,
std::string &value,
int timeout_milliseconds = 5000,
internal_info *info = NULL) = 0;
async_get
异步读取一条数据
///
/// \brief asynchronous get
/// get value by key from the cluster.
/// will not be blocked, return immediately.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param sortkey
/// all the k-v under hashkey will be sorted by sortkey.
/// \param callback
/// the callback function will be invoked after operation finished or error occurred.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// void.
///
virtual void async_get(const std::string &hashkey,
const std::string &sortkey,
async_get_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) = 0;
multi_get
读取多条数据
///
/// \brief multi_get
/// get multiple value by key from the cluster.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param sortkeys
/// all the k-v under hashkey will be sorted by sortkey.
/// if empty, means fetch all sortkeys under the hashkey.
/// \param values
/// the returned <sortkey,value> pairs will be put into it.
/// if data is not found for some <hashkey,sortkey>, then it will not appear in the map.
/// \param max_fetch_count
/// max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
/// \param max_fetch_size
/// max size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string().
/// returns PERR_OK if fetch done, even no data is returned.
/// returns PERR_INCOMPLETE is only partial data is fetched.
///
virtual int multi_get(const std::string &hashkey,
const std::set<std::string> &sortkeys,
std::map<std::string, std::string> &values,
int max_fetch_count = 100,
int max_fetch_size = 1000000,
int timeout_milliseconds = 5000,
internal_info *info = NULL) = 0;
注:max_fetch_count 和 max_fetch_size 分别从kv-pair的条数 和 总的大小来限制multi_get的返回值
async_multi_get
异步读取多条数据
///
/// \brief asynchronous multi_get
/// get multiple value by key from the cluster.
/// will not be blocked, return immediately.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param sortkeys
/// all the k-v under hashkey will be sorted by sortkey.
/// if empty, means fetch all sortkeys under the hashkey.
/// \param callback
/// the callback function will be invoked after operation finished or error occurred.
/// \param max_fetch_count
/// max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit.
/// \param max_fetch_size
/// max size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// void.
///
virtual void async_multi_get(const std::string &hashkey,
const std::set<std::string> &sortkeys,
async_multi_get_callback_t &&callback = nullptr,
int max_fetch_count = 100,
int max_fetch_size = 1000000,
int timeout_milliseconds = 5000) = 0;
multi_get_sortkeys
获取hashkey下面的多个sortkey,不返回value
///
/// \brief multi_get_sortkeys
/// get multiple sort keys by hash key from the cluster.
/// only fetch sort keys, but not fetch values.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param sortkeys
/// the returned sort keys will be put into it.
/// \param max_fetch_count
/// max count of sort keys to be fetched. max_fetch_count <= 0 means no limit.
/// \param max_fetch_size
/// max size of sort keys to be fetched. max_fetch_size <= 0 means no limit.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string().
/// returns PERR_OK if fetch done, even no data is returned.
/// returns PERR_INCOMPLETE is only partial data is fetched.
///
virtual int multi_get_sortkeys(const std::string &hashkey,
std::set<std::string> &sortkeys,
int max_fetch_count = 100,
int max_fetch_size = 1000000,
int timeout_milliseconds = 5000,
internal_info *info = NULL) = 0;
注:max_fetch_count 和 max_fetch_size 分别限制返回的sortkey的个数与总大小(计算大小的时候,为每条sortkey都计算一次hashkey的大小)
async_multi_get_sortkeys
异步获取hashkey下面的多个sortkey(不包含value)
///
/// \brief asynchronous multi_get_sortkeys
/// get multiple sort keys by hash key from the cluster.
/// only fetch sort keys, but not fetch values.
/// will not be blocked, return immediately.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param callback
/// the callback function will be invoked after operation finished or error occurred.
/// \param max_fetch_count
/// max count of sort keys to be fetched. max_fetch_count <= 0 means no limit.
/// \param max_fetch_size
/// max size of sort keys to be fetched. max_fetch_size <= 0 means no limit.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// void.
///
virtual void async_multi_get_sortkeys(const std::string &hashkey,
async_multi_get_sortkeys_callback_t &&callback = nullptr,
int max_fetch_count = 100,
int max_fetch_size = 1000000,
int timeout_milliseconds = 5000) = 0;
exist
判断单条数据是否存在
///
/// \brief exist
/// check value exist by key from the cluster.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param sortkey
/// all the k-v under hashkey will be sorted by sortkey.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string().
/// returns PERR_OK if exist.
/// returns PERR_NOT_FOUND if not exist.
///
virtual int exist(const std::string &hashkey,
const std::string &sortkey,
int timeout_milliseconds = 5000,
internal_info *info = NULL) = 0;
sortkey_count
统计hashkey下面的sortkey个数
///
/// \brief sortkey_count
/// get sortkey count by hashkey from the cluster.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param count
/// the returned sortkey count
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string().
///
virtual int sortkey_count(const std::string &hashkey,
int64_t &count,
int timeout_milliseconds = 5000,
internal_info *info = NULL) = 0;
del
删除单条数据
///
/// \brief del
/// del stored k-v by key from cluster
/// key is composed of hashkey and sortkey. must provide both to get the value.
/// \param hashkey
/// used to decide from which partition to del this k-v
/// \param sortkey
/// all the k-v under hashkey will be sorted by sortkey.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
///
virtual int del(const std::string &hashkey,
const std::string &sortkey,
int timeout_milliseconds = 5000,
internal_info *info = NULL) = 0;
async_del
异步删除单条数据
///
/// \brief asynchronous del
/// del stored k-v by key from cluster
/// key is composed of hashkey and sortkey. must provide both to get the value.
/// will not be blocked, return immediately.
/// \param hashkey
/// used to decide from which partition to del this k-v
/// \param sortkey
/// all the k-v under hashkey will be sorted by sortkey.
/// \param callback
/// the callback function will be invoked after operation finished or error occurred.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// void.
///
virtual void async_del(const std::string &hashkey,
const std::string &sortkey,
async_del_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) = 0;
multi_del
删除多条数据
///
/// \brief multi_del
/// delete multiple value by key from the cluster.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param sortkeys
/// all the k-v under hashkey will be sorted by sortkey. should not be empty.
/// \param deleted_count
/// return count of deleted k-v pairs.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string().
///
virtual int multi_del(const std::string &hashkey,
const std::set<std::string> &sortkeys,
int64_t &deleted_count,
int timeout_milliseconds = 5000,
internal_info *info = NULL) = 0;
async_multi_del
异步的删除多条数据
///
/// \brief asynchronous multi_del
/// delete multiple value by key from the cluster.
/// will not be blocked, return immediately.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param sortkeys
/// all the k-v under hashkey will be sorted by sortkey. should not be empty.
/// \param callback
/// the callback function will be invoked after operation finished or error occurred.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// void.
///
virtual void async_multi_del(const std::string &hashkey,
const std::set<std::string> &sortkeys,
async_multi_del_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) = 0;
ttl
获取单行数据的TTL时间。TTL表示Time To Live,表示该数据还能存活多久。如果超过存活时间,数据就读不到了
///
/// \brief ttl (time to live)
/// get ttl in seconds of this k-v.
/// key is composed of hashkey and sortkey. must provide both to get the value.
/// \param hashkey
/// used to decide which partition to get this k-v
/// \param sortkey
/// all the k-v under hashkey will be sorted by sortkey.
/// \param ttl_seconds
/// the returned ttl value in seconds.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
///
virtual int ttl(const std::string &hashkey,
const std::string &sortkey,
int &ttl_seconds,
int timeout_milliseconds = 5000,
internal_info *info = NULL) = 0;
get_scanner
获取针对某个hashkey下的sortkey区间 [sortkeyA ~ sortkeyB)的一个scanner
///
/// \brief get hash scanner
/// get scanner for [start_sortkey, stop_sortkey) of hashkey
/// \param hashkey
/// cannot be empty
/// \param start_sortkey
/// sortkey to start with
/// \param stop_sortkey
/// sortkey to stop. ""(empty string) represents the max key
/// \param options
/// which used to indicate scan options, like which bound is inclusive
/// \param scanner
/// out param, used to get k-v
/// this pointer should be deleted when scan complete
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
///
virtual int get_scanner(const std::string &hashkey,
const std::string &start_sortkey, // start from beginning if this set ""
const std::string &stop_sortkey, // to the last item if this set ""
const scan_options &options,
pegasus_scanner *&scanner) = 0;
async_get_scanner
异步获取针对某个hashkey下的sortkey区间 [sortkeyA ~ sortkeyB)的一个scanner
///
/// \brief async get hash scanner
/// get scanner for [start_sortkey, stop_sortkey) of hashkey
/// will not be blocked, return immediately.
/// \param hashkey
/// cannot be empty
/// \param start_sortkey
/// sortkey to start with
/// \param stop_sortkey
/// sortkey to stop. ""(empty string) represents the max key
/// \param options
/// which used to indicate scan options, like which bound is inclusive
/// \param callback
/// return status and scanner in callback, and the latter should be deleted when scan complete
///
virtual void
async_get_scanner(const std::string &hashkey,
const std::string &start_sortkey, // start from beginning if this set ""
const std::string &stop_sortkey, // to the last item if this set ""
const scan_options &options,
async_get_scanner_callback_t &&callback) = 0;
get_unordered_scanners
获取一个遍历所有数据的scanner
///
/// \brief get a bundle of scanners to iterate all k-v in table
/// scanners should be deleted when scan complete
/// \param max_split_count
/// the number of scanners returned will always <= max_split_count
/// \param options
/// which used to indicate scan options, like timeout_milliseconds
/// \param scanners
/// out param, used to get k-v
/// these pointers should be deleted
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
///
virtual int get_unordered_scanners(int max_split_count,
const scan_options &options,
std::vector<pegasus_scanner *> &scanners) = 0;
async_get_unordered_scanners
异步获取一个遍历所有数据的scanner
///
/// \brief async get a bundle of scanners to iterate all k-v in table
/// scannners return by callback should be deleted when all scan complete
/// \param max_split_count
/// the number of scanners returned will always <= max_split_count
/// \param options
/// which used to indicate scan options, like timeout_milliseconds
/// \param callback; return status and scanner in this callback
///
virtual void
async_get_unordered_scanners(int max_split_count,
const scan_options &options,
async_get_unordered_scanners_callback_t &&callback) = 0;