Loading...
墨滴

IT明哥

2021/08/03  阅读:59  主题:默认主题

浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四

浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)

前言

大家好,我是明哥。

HIVE 作为大数据生态的数仓解决方案,因为历史的原因在很多行业很多公司都有着广泛的应用。对于比较复杂的业务逻辑,HIVE SQL 往往比较难以表达,此时大家在开发中往往会辅以 HIVE UDF。所以充分理解和掌握 HIVE UDF正确的表写和使用方式,是大数据从业人员必不可少的一项技能。

对于 HIVE UDF 编写使用过程中常见的问题,明哥编写了一个系列 - “浅析 hive udf 的正确编写和使用方式 - 论姿势的重要性“,并陆续发布了三篇博文:

  • “如何在 hive udf 中访问配置数据-方案汇总与对比”
  • “浅析 hive udaf 的正确编写方式- 论姿势的重要性"
  • “浅析 hive udf 的正确编写和使用方式- 论姿势的重要性 - 系列三 - hdfs 相对路径与静态代码块引起的问题”

以下是该系列的第四篇博文:“浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service”

问题概述

在编写UDF时,有时我们需要直接访问 hive metastore service 以获取某些库表或分区的元数据信息,比如最近遇到某客户有个需求,需要直接访问 metastore service 以获取给定分区表的最大分区。该客户已经实现了基本的代码,且在没有开启 kerberos 认证的 hive 中也通过了测试,但在开启了 kerberos 认证的 hive 中执行该 UDF 时却会报错。 该有 BUG 的 UDF 示例代码如下:

hive-udf-code-with-bug
hive-udf-code-with-bug

该有 BUG 的 UDF 在开启了 kerberos 认证的 hive 中执行时报错如下: hive-udf-error-msg

问题原因分析

在本系列文章中,笔者不断提到,“Hive SQL 和 UDF 的解析编译和优化是在 hiveserver2 中进行的,解析编译和优化的结果一般是生成 mr/tez/spark 任务,这些 mr/tez/spark 任务是在向 yarn 申请获得的 container 容器对应的 jvm 中执行的;但并不是所有的 sql 和 udf 都会生成 mr/tez/spark 任务,此时其真正的执行就是直接在 hiveserver2 这个已经存在的 jvm 中执行的,该 hiveserver2 这个 jvm 的生命周期跟 udf 的执行无关,如果涉及到配置环境变量,系统参数,或加载类及执行静态代码块,要尤其小心“.

其实这里原来的代码有问题的原因,就跟上面不断强调的这句话有关: hive 的 udf 是在 hiveserver2 这个 Jvm 进程中编译执行的(有时会生成mr作业有时不会,在我们这个场景下不会生成 mr 作业),而 hiveserver2 这个 jvm 进程已经通过 kerberos 认证了,已经能够正常访问 hive metastore service 了,所以 udf 代码中不需要再次执行 UserGroupInformation.loginUserFromKeytab(principal,keytab)进行 kerberos 认证!事实上,由于再次尝试认证时涉及到配置环境变量和系统参数(如示例代码中 java.security.krb5.conf, HiveConf 等),稍有不慎就会污染已经启动的 hiveserver2 这个 jvm s 实例,而该实例是全局的给多个客户端使用的 hive 服务端,一旦被污染会影响其它客户端的执行,危害比较大,往往需要重启 hiveserver2 实例才能修复。

问题解决方案

知道了问题发生的根本原因,问题解决思路就有了:

  • 不要再次执行 kerberos 认证代码 UserGroupInformation.loginUserFromKeytab(principal,keytab);
  • 不要重新创建全新的 org.apache.hadoop.hive.conf.HiveConf 实例,而是复用已有的 HiveConf 实例;
  • 已有的 HiveConf 实例,可以通过 HiveConf hiveConf = SessionState.get().getConf() 获得;

最后还有必要指出,hive udf 和 udaf 有两种接口,一种是旧的 simple Udf/udaf,一种是新的GenericUDF/GenericUDAF:

  • org.apache.hadoop.hive.ql.exec.UDF/org.apache.hadoop.hive.ql.exec.UDAF
  • org.apache.hadoop.hive.ql.udf.generic.GenericUDF/org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

因为支持更多特性和执行时的性能问题,hive 社区推荐我们使用后者。

示例代码 (已经通过none/ldap/kerberos认证环境下的测试,可以直接使用)

simple Udf 示例代码:

package com.xxx;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.service.server.HiveServer2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;

@Description(name = "max_pt", value = "_FUNC_(db,table) - return the max partition for a hive partitioned table")

//note that hive udf are executed in the same jvm as hiveserver2
public class UdfMaxPt extends UDF {
    // this is the same logger as the one created in org.apache.hive.service.server.HiveServer2
    private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);

    public String evaluate(String db, String table) {
        try {
            // note that you can get the HiveConf from org.apache.hadoop.hive.ql.session.SessionState
            HiveConf hiveConf = SessionState.get().getConf();
            HiveMetaStoreClient hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
            List<Partition> partitions = hiveMetaStoreClient.listPartitions(db, table, Short.MAX_VALUE);

            // there is no need to call UserGroupInformation.loginUserFromKeytab(principal,keytab) to log into kdc,
            // as the udf is executed in hiveserver2 and hiveserver2 has already logged into kdc;
            // this can be verified by check below outputs
//          String principal = "hive/_HOST@CDH.COM";
//          String keytab= "hive.keytab";
//          UserGroupInformation.loginUserFromKeytab(principal,keytab);


            LOG.info("UserGroupInformation.getCurrentUser(): " + UserGroupInformation.getCurrentUser().toString());
            LOG.info("UserGroupInformation.getLoginUser(): " + UserGroupInformation.getLoginUser().toString());
            LOG.info("principal:" + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL));
            LOG.info("keytab:" + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB));
            LOG.info("java.security.krb5.conf" + System.getProperty("java.security.krb5.conf"));
            LOG.info("java.security.krb5.realm" + System.getProperty("java.security.krb5.realm"));
            LOG.info("java.security.krb5.kdc" + System.getProperty("java.security.krb5.kdc"));
            LOG.info("HADOOP_USER_NAME" + System.getenv("HADOOP_USER_NAME"));
            LOG.info("HADOOP_OPTS" + System.getenv("HADOOP_OPTS"));
            // return the max partition value
            return String.valueOf(Collections.max(partitions).getValues().get(0));
        } catch (Exception e) {
            LOG.warn(e.getMessage());
            LOG.warn(e.toString());
            return e.getMessage();
        }
    }

}

GenericUDF 示例代码:

package com.xxxx;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hive.service.server.HiveServer2;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;

@Description(name = "max_pt", value = "_FUNC_(db,table) - return the max partition for a hive partitioned table")
//note that hive udf are executed in the same jvm as hiveserver2
public class UdfMaxPtNew extends GenericUDF {
    // this is the same logger as the one created in org.apache.hive.service.server.HiveServer2
    private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
    PrimitiveObjectInspector inputDbOI;
    PrimitiveObjectInspector inputTableOI;
    PrimitiveObjectInspector outputOI;

    @Override
    public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        // This UDF accepts one argument
        assert (args.length == 2);
        // The first argument is a primitive type
        assert (args[0].getCategory() == ObjectInspector.Category.PRIMITIVE);
        assert (args[1].getCategory() == ObjectInspector.Category.PRIMITIVE);

        inputDbOI = (PrimitiveObjectInspector) args[0];
        inputTableOI = (PrimitiveObjectInspector) args[1];

        /* We only support String type */
        assert (inputDbOI.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING);
        assert (inputTableOI.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING);

        /* And we'll return a type string, so let's return the corresponding object inspector */
        outputOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;

        return outputOI;
    }

    @Override
    public Object evaluate(DeferredObject[] args) throws HiveException {
        if (args.length != 2) {
            return "";
        }
// Access the deferred value. Hive passes the arguments as "deferred" objects
// to avoid some computations if we don't actually need some of the values
        Object oin1 = args[0].get();
        Object oin2 = args[1].get();
        if (oin1 == null || oin2 == null) {
            return "";
        }
        String inputDb = (String) inputDbOI.getPrimitiveJavaObject(oin1);
        String inputTable = (String) inputTableOI.getPrimitiveJavaObject(oin2);
        if (StringUtils.isEmpty(inputDb) || StringUtils.isEmpty(inputTable)) {
            return "";
        }

        HiveConf hiveConf = SessionState.get().getConf();
        HiveMetaStoreClient hiveMetaStoreClient = null;
        try {
            hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
            List<Partition> partitions = hiveMetaStoreClient.listPartitions(inputDb, inputTable, Short.MAX_VALUE);
            return String.valueOf(Collections.max(partitions).getValues().get(0));
        } catch (TException e) {
            LOG.warn(e.getMessage());
            LOG.warn(e.toString());
            return e.getMessage();
        }
    }

    @Override
    public String getDisplayString(String[] children) {
        return "return the max partition for a hive partitioned table";
    }
}

来自客户的认可才是最大的认可,以上示例代码交付给客户后,明哥收到了客户的赞赏,允许我小小的得瑟一下,哈哈。

feed back from client
feed back from client

!关注不迷路~ 各种福利、资源定期分享!欢迎有想法、乐于分享的小伙伴们, 扫码加群交流!

wechat-group
wechat-group

IT明哥

2021/08/03  阅读:59  主题:默认主题

作者介绍

IT明哥