gohive

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2022 License: MIT Imports: 22 Imported by: 0

README

GoHive

Build Status Coverage Status

GoHive is a driver for Hive and the Spark Distributed SQL Engine in go that supports connection mechanisms KERBEROS(Gssapi Sasl), NONE(Plain Sasl), LDAP, CUSTOM and NOSASL, both for binary and http transport, with and without SSL. The kerberos mechanism will pick a different authentication level depending on hive.server2.thrift.sasl.qop.

Installation

Gohive can be installed with:

go get github.com/baiweiguo/gohive

To add kerberos support gohive requires header files to build against the GSSAPI C library. They can be installed with:

  • Ubuntu: sudo apt-get install libkrb5-dev
  • MacOS: brew install homebrew/dupes/heimdal --without-x11
  • Debian: yum install -y krb5-devel

Then:

go get -tags kerberos github.com/baiweiguo/gohive

Quickstart

    connection, errConn := gohive.Connect("hs2.example.com", 10000, "KERBEROS", configuration)
    if errConn != nil {
        log.Fatal(errConn)
    }
    cursor := connection.Cursor()

    cursor.Exec(ctx, "INSERT INTO myTable VALUES(1, '1'), (2, '2'), (3, '3'), (4, '4')")
    if cursor.Err != nil {
        log.Fatal(cursor.Err)
    }

    cursor.Exec(ctx, "SELECT * FROM myTable")
    if cursor.Err != nil {
        log.Fatal(cursor.Err)
    }

    var i int32
    var s string
    for cursor.HasMore(ctx) {
        cursor.FetchOne(ctx, &i, &s)
        if cursor.Err != nil {
            log.Fatal(cursor.Err)
        }
        log.Println(i, s)
    }

    cursor.Close()
    connection.Close()

cursor.HasMore may query hive for more rows if not all of them have been received. Once the row is read is discarded from memory so as long as the fetch size is not too big there's no limit to how much data can be queried.

Supported connections

Connect with Sasl kerberos:
configuration := NewConnectConfiguration()
configuration.Service = "hive"
// Previously kinit should have done: kinit -kt ./secret.keytab hive/hs2.example.com@EXAMPLE.COM
connection, errConn := Connect("hs2.example.com", 10000, "KERBEROS", configuration)

This implies setting in hive-site.xml:

  • hive.server2.authentication = KERBEROS
  • hive.server2.authentication.kerberos.principal = hive/_HOST@EXAMPLE.COM
  • hive.server2.authentication.kerberos.keytab = path/to/keytab.keytab
Connnect using Plain Sasl:
configuration := NewConnectConfiguration()
// If it's not set it will be picked up from the logged user
configuration.Username = "myUsername"
// This may not be necessary
configuration.Password = "myPassword"
connection, errConn := Connect("hs2.example.com", 10000, "NONE", configuration)

This implies setting in hive-site.xml:

  • hive.server2.authentication = NONE
Connnect using No Sasl:
connection, errConn := Connect("hs2.example.com", 10000, "NOSASL", NewConnectConfiguration())

This implies setting in hive-site.xml:

  • hive.server2.authentication = NOSASL
Connect using Http transport mode

Binary transport mode is supported for this three options(PLAIN, KERBEROS and NOSASL). Http transport is supported for PLAIN and KERBEROS:

configuration := NewConnectConfiguration()
configuration.HttpPath = "cliservice" // this is the default path in hive configuration.
configuration.TransportMode = "http"
configuration.Service = "hive"

connection, errConn := Connect("hs2.example.com", 10000, "KERBEROS", configuration)

This implies setting in hive-site.xml:

  • hive.server2.authentication = KERBEROS, or NONE
  • hive.server2.transport.mode = http
  • hive.server2.thrift.http.port = 10001

Zookeeper

A connection can be made using zookeeper:

connection, errConn := ConnectZookeeper("zk1.example.com:2181,zk2.example.com:2181", "NONE", configuration)

The last two parameters determine how the connection to hive will be made once the hive hosts are retrieved from zookeeper.

NULL values

For example if a NULL value is in a row, the following operations would put 0 into i:

var i int32
cursor.FetchOne(context.Background(), &i)

To differentiate between these two values (NULL and 0) the following will set i to nil or *i to 0:

var i *int32 = new(int32)
cursor.FetchOne(context.Background(), &i)

which will produce the same result as:

var i *int32
cursor.FetchOne(context.Background(), &i)

Alternatively, using the rowmap API, m := cursor.RowMap(context.Background()), m would be map[string]interface{}{"table_name.column_name": nil} for a NULL value. It will return a map where the keys are table_name.column_name. This works fine with hive but using Spark Thirft SQL server table_name is not present and the keys are column_name and it can lead to problems if two tables have the same column name so the FetchOne API should be used in this case.

Running tests

Tests can be run with:

./scripts/integration

This uses dhive and it will start two docker instances with hive and kerberos. kinit, klist, kdestroy have to be installed locally. hs2.example.com will have to be an alias for 127.0.0.1 in /etc/hosts. The krb5 configuration file should be created with bash scripts/create_krbconf.sh. Overall the steps used in the travis CI can be followed.

Documentation

Index

Constants

View Source
const (
	START    = 1
	OK       = 2
	BAD      = 3
	ERROR    = 4
	COMPLETE = 5
)
View Source
const DEFAULT_FETCH_SIZE int64 = 1000
View Source
const DEFAULT_MAX_LENGTH = 16384000
View Source
const ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2"

Variables

View Source
var DEFAULT_ERROR_CODE = int32(-1)
View Source
var DEFAULT_ERROR_MESSAGE = "unknown error"
View Source
var DEFAULT_SQL_STATE = ""
View Source
var DEFAULT_STATUS = hiveserver.TStatus{
	StatusCode:   hiveserver.TStatusCode_ERROR_STATUS,
	InfoMessages: nil,
	SqlState:     &DEFAULT_SQL_STATE,
	ErrorCode:    &DEFAULT_ERROR_CODE,
	ErrorMessage: &DEFAULT_ERROR_MESSAGE,
}

Functions

This section is empty.

Types

type ConnectConfiguration

type ConnectConfiguration struct {
	Username             string
	Principal            string
	Password             string
	Service              string
	HiveConfiguration    map[string]string
	PollIntervalInMillis int
	FetchSize            int64
	TransportMode        string
	HTTPPath             string
	TLSConfig            *tls.Config
	ZookeeperNamespace   string
	Database             string
	ConnectTimeout       time.Duration
	SocketTimeout        time.Duration
	HttpTimeout          time.Duration
	DialContext          DialContextFunc
	DisableKeepAlives    bool
	// Maximum length of the data in bytes. Used for SASL.
	MaxSize uint32
}

ConnectConfiguration is the configuration for the connection The fields have to be filled manually but not all of them are required Depends on the auth and kind of connection.

func NewConnectConfiguration

func NewConnectConfiguration() *ConnectConfiguration

NewConnectConfiguration returns a connect configuration, all with empty fields

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection holds the information for getting a cursor to hive

func Connect

func Connect(host string, port int, auth string,
	configuration *ConnectConfiguration) (conn *Connection, err error)

Connect to hive server

func ConnectZookeeper

func ConnectZookeeper(hosts string, auth string,
	configuration *ConnectConfiguration) (conn *Connection, err error)

Connect to zookeper to get hive hosts and then connect to hive. hosts is in format host1:port1,host2:port2,host3:port3 (zookeeper hosts).

func (*Connection) Close

func (c *Connection) Close() error

Close closes a session

func (*Connection) Cursor

func (c *Connection) Cursor() *Cursor

Cursor creates a cursor from a connection

type Cursor

type Cursor struct {
	Err error

	// Caller is responsible for managing this channel
	Logs chan<- []string
	// contains filtered or unexported fields
}

Cursor is used for fetching the rows after a query

func (*Cursor) Cancel

func (c *Cursor) Cancel()

Cancels the current operation

func (*Cursor) Close

func (c *Cursor) Close()

Close closes the cursor

func (*Cursor) Description

func (c *Cursor) Description() [][]string

Description return a map with the names of the columns and their types must be called after a FetchResult request a context should be added here but seems to be ignored by thrift

func (*Cursor) Error

func (c *Cursor) Error() error

func (*Cursor) Exec

func (c *Cursor) Exec(ctx context.Context, query string)

Exec issues a synchronous query.

func (*Cursor) Execute

func (c *Cursor) Execute(ctx context.Context, query string, async bool)

Execute sends a query to hive for execution with a context

func (*Cursor) FetchLogs

func (c *Cursor) FetchLogs() []string

FetchLogs returns all the Hive execution logs for the latest query up to the current point

func (*Cursor) FetchOne

func (c *Cursor) FetchOne(ctx context.Context, dests ...interface{})

FetchOne returns one row and advances the cursor one

func (*Cursor) Finished

func (c *Cursor) Finished() bool

Finished returns true if the last async operation has finished

func (*Cursor) HasMore

func (c *Cursor) HasMore(ctx context.Context) bool

HasMore returns whether more rows can be fetched from the server

func (*Cursor) Poll

func (c *Cursor) Poll(getProgress bool) (status *hiveserver.TGetOperationStatusResp)

Poll returns the current status of the last operation

func (*Cursor) RowMap

func (c *Cursor) RowMap(ctx context.Context) map[string]interface{}

RowMap returns one row as a map. Advances the cursor one

func (*Cursor) WaitForCompletion

func (c *Cursor) WaitForCompletion(ctx context.Context)

WaitForCompletion waits for an async operation to finish

type DialContextFunc

type DialContextFunc func(ctx context.Context, network, addr string) (net.Conn, error)

type HiveError

type HiveError struct {

	// Simple error message, without the full stack trace. Surfaced from Thrift.
	Message string
	// See https://github.com/apache/hive/blob/master/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java for info about error codes
	ErrorCode int
	// contains filtered or unexported fields
}

HiveError represents an error surfaced from Hive. We attach the specific Error code along with the usual message.

type TSaslTransport

type TSaslTransport struct {
	OpeningContext context.Context
	// contains filtered or unexported fields
}

TSaslTransport is a tranport thrift struct that uses SASL

func NewTSaslTransport

func NewTSaslTransport(trans thrift.TTransport, host string, mechanismName string, configuration map[string]string, maxLength uint32) (*TSaslTransport, error)

NewTSaslTransport return a TSaslTransport

func (*TSaslTransport) Close

func (p *TSaslTransport) Close() (err error)

Close close a SASL transport connection

func (*TSaslTransport) Flush

func (p *TSaslTransport) Flush(ctx context.Context) (err error)

Flush the bytes in the buffer

func (*TSaslTransport) IsOpen

func (p *TSaslTransport) IsOpen() bool

IsOpen opens a SASL connection

func (*TSaslTransport) Open

func (p *TSaslTransport) Open() (err error)

Open check if a SASL transport connection is opened

func (*TSaslTransport) Read

func (p *TSaslTransport) Read(buf []byte) (l int, err error)

func (*TSaslTransport) RemainingBytes

func (p *TSaslTransport) RemainingBytes() uint64

RemainingBytes return the size of the unwrapped bytes

func (*TSaslTransport) SetMaxLength

func (p *TSaslTransport) SetMaxLength(maxLength uint32)

SetMaxLength set the maxLength

func (*TSaslTransport) Write

func (p *TSaslTransport) Write(buf []byte) (int, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL