-
Notifications
You must be signed in to change notification settings - Fork 37
/
hdfs_query.c
189 lines (159 loc) · 6.14 KB
/
hdfs_query.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/*-------------------------------------------------------------------------
*
* hdfs_query.c
* Planner helper functions to get information from Hive/Spark server.
*
* Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 2004-2024, EnterpriseDB Corporation.
*
* IDENTIFICATION
* hdfs_query.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "hdfs_fdw.h"
#include "libhive/jdbc/hiveclient.h"
static double hdfs_find_row_count(char *src);
/*
* In order to get number of rows in a hive table we use
* explain select * from names_tab;
* It produces a result similar to the following
+--------------------------------------------------------------------------------------------+--+
| Explain |
+--------------------------------------------------------------------------------------------+--+
| STAGE DEPENDENCIES: |
| Stage-0 is a root stage |
| |
| STAGE PLANS: |
| Stage: Stage-0 |
| Fetch Operator |
| limit: -1 |
| Processor Tree: |
| TableScan |
| alias: names_tab |
| Statistics: Num rows: 10 Data size: 36 Basic stats: PARTIAL Column stats: NONE |
| Select Operator |
| expressions: id (type: int), name (type: string) |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 10 Data size: 36 Basic stats: PARTIAL Column stats: NONE |
| ListSink |
| |
+--------------------------------------------------------------------------------------------+--+
17 rows selected (0.706 seconds)
*
* In order to extract number of rows from this output, each row of explain
* output is sent one by one to this function. The function tries to find the
* keyword "Statistics: Num rows:" in each row, if the row does not contain any
* such keyword it is rejected. Otherwise the characters from the line are
* picked right after the keyword and till a space is encountered. The
* characters are then converted to a float, and is used as row count.
*/
double
hdfs_find_row_count(char *src)
{
char row_count[51];
char statistics_str[] = "Statistics: Num rows: ";
char *pos;
if (src == NULL || strlen(src) < 80)
return 0;
/* Does the passed line of explain output contain the keyword? */
pos = strstr(src, statistics_str);
if (pos == NULL)
return 0;
/* Copy characters after the keyword in the line */
strncpy(row_count, pos + strlen(statistics_str), 50);
/* Make sure that the string is null terminated */
row_count[50] = '\0';
return strtod(row_count, NULL);
}
/*
* hdfs_rowcount
* Retrieves the number of rows from remote server. Refer the comments
* above function hdfs_find_row_count() for how it is done.
*/
double
hdfs_rowcount(int con_index, hdfs_opt *opt, PlannerInfo *root,
RelOptInfo *baserel, HDFSFdwRelationInfo *fpinfo)
{
bool is_null;
StringInfoData sql;
double rc = 0;
initStringInfo(&sql);
hdfs_deparse_explain(opt, &sql);
hdfs_query_execute(con_index, opt, sql.data);
while (hdfs_fetch(con_index) == 0)
{
char *value;
value = hdfs_get_field_as_cstring(con_index, 0, &is_null);
if (!is_null)
{
rc = hdfs_find_row_count(value);
if (rc != 0.0)
break;
}
}
hdfs_close_result_set(con_index);
/*
* Any value below 1000 is going to cause materialization step to be
* skipped from the plan, which is very crucial to avoid requery in
* rescan.
*/
return (rc > 1000) ? rc : 1000;
}
/*
* hdfs_analyze
* Sends analyze query to remote server to compute the statistics.
*/
void
hdfs_analyze(int con_index, hdfs_opt *opt, Relation rel)
{
StringInfoData sql;
initStringInfo(&sql);
hdfs_deparse_analyze(&sql, rel);
if (opt->client_type == SPARKSERVER)
hdfs_query_execute(con_index, opt, sql.data);
else
hdfs_query_execute_utility(con_index, opt, sql.data);
hdfs_close_result_set(con_index);
}
/*
* hdfs_describe
* This function sends describe query to the remote server and retrieves
* the total size of the data in remote table.
*/
double
hdfs_describe(int con_index, hdfs_opt *opt, Relation rel)
{
double row_count = 0;
StringInfoData sql;
initStringInfo(&sql);
hdfs_deparse_describe(&sql, rel);
hdfs_query_execute(con_index, opt, sql.data);
/*
* hdfs_deparse_describe() sends a query of the form "DESCRIBE FORMATTED
* sometab" to the remote server. This produces the output in the
* columnar format. The 'totalSize' is placed in the 1st column (indexed
* by 0) and its value is placed in the 2nd column of the same row. Hence,
* we directly search for the 1st column of each row until we find the
* 'totalSize', and once we find that, only then we retrieve the 2nd
* column of that row and break.
*/
while (hdfs_fetch(con_index) == 0)
{
char *value;
bool is_null;
value = hdfs_get_field_as_cstring(con_index, 1, &is_null);
if (is_null)
continue;
if (strstr(value, "totalSize") != 0)
{
char *str;
str = hdfs_get_field_as_cstring(con_index, 2, &is_null);
row_count = strtod(str, NULL);
break;
}
}
hdfs_close_result_set(con_index);
return row_count;
}