Commit 0875e2a4 authored by Brian Major's avatar Brian Major
Browse files

ac2 - connection pools refactored

parent e8622183
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -20,6 +20,14 @@ readWrite.poolPolicy = roundRobin
readWrite.maxWait = 30000
readWrite.createIfNeeded = false

# Unbound-Read-write connection pool
unboundReadOnly.servers = proc5-03.cadc.dao.nrc.ca
unboundReadOnly.poolInitSize = 1
unboundReadOnly.poolMaxSize = 1
unboundReadOnly.poolPolicy = roundRobin
unboundReadOnly.maxWait = 30000
unboundReadOnly.createIfNeeded = false

# server configuration -- applies to all servers
dbrcHost = devLdap
port = 636
+8 −0
Original line number Diff line number Diff line
@@ -21,6 +21,14 @@ readWrite.poolPolicy = <roundRobin || fewestConnections>
readWrite.maxWait = <timeout wait time in milliseconds>
readWrite.createIfNeeded = <true || false> Go beyond poolMaxSize

# Unbound-Read-only connection pool
unboundReadOnly.servers = <list of ldap servers for readonly unbound access>
unboundReadOnly.poolInitSize = <number of initial connections in the readonly pool>
unboundReadOnly.poolMaxSize = <maximum number of connections in the readonly pool>
unboundReadOnly.poolPolicy = <roundRobin || fewestConnections>
unboundReadOnly.maxWait = <timeout wait time in milliseconds>
unboundReadOnly.createIfNeeded = <true || false> Go beyond poolMaxSize

# server configuration -- applies to all servers
dbrcHost = <prodLdap || devLdap>
port = <389 or 636>
+50 −23
Original line number Diff line number Diff line
@@ -81,7 +81,7 @@ import ca.nrc.cadc.util.MultiValuedProperties;
import ca.nrc.cadc.util.PropertiesReader;

/**
 * Reads and stores the LDAP configuration information. The information
 * Reads and stores the LDAP configuration information.
 *
 * @author adriand
 *
@@ -94,6 +94,7 @@ public class LdapConfig

    public static final String READONLY_PREFIX = "readOnly.";
    public static final String READWRITE_PREFIX = "readWrite.";
    public static final String UB_READONLY_PREFIX = "unboundReadOnly.";
    public static final String POOL_SERVERS = "servers";
    public static final String POOL_INIT_SIZE = "poolInitSize";
    public static final String POOL_MAX_SIZE = "poolMaxSize";
@@ -151,6 +152,23 @@ public class LdapConfig
            return createIfNeeded;
        }

        @Override
        public String toString()
        {
            StringBuilder sb = new StringBuilder();
            sb.append(" Servers: ");
            for (String server : servers)
            {
                sb.append(" [" + server + "]");
            }
            sb.append(" initSize: " + initSize);
            sb.append(" maxSize: " + maxSize);
            sb.append(" policy: " + policy);
            sb.append(" maxWait: " + maxWait);
            sb.append(" createIfNeeded: " + createIfNeeded);
            return sb.toString();
        }

        @Override
        public boolean equals(Object other)
        {
@@ -183,6 +201,7 @@ public class LdapConfig

    private LdapPool readOnlyPool = new LdapPool();
    private LdapPool readWritePool = new LdapPool();
    private LdapPool unboundReadOnlyPool = new LdapPool();
    private int port;
    private String usersDN;
    private String userRequestsDN;
@@ -204,16 +223,15 @@ public class LdapConfig

    public static LdapConfig getLdapConfig()
    {
        return getLdapConfig(CONFIG);
        return loadLdapConfig(CONFIG);
    }

    public static LdapConfig getLdapConfig(String ldapProperties)
    public static LdapConfig loadLdapConfig(String ldapProperties)
    {
        logger.debug("Reading LDAP properties from: " + ldapProperties);
        PropertiesReader pr = new PropertiesReader(ldapProperties);

        MultiValuedProperties config = pr.getAllProperties();

        if (config == null || config.keySet() == null)
        {
            throw new RuntimeException("failed to read any LDAP property ");
@@ -221,19 +239,9 @@ public class LdapConfig

        LdapConfig ldapConfig = new LdapConfig();

        ldapConfig.readOnlyPool.servers = getMultiProperty(pr, READONLY_PREFIX + POOL_SERVERS);
        ldapConfig.readOnlyPool.initSize = Integer.valueOf(getProperty(pr, READONLY_PREFIX + POOL_INIT_SIZE));
        ldapConfig.readOnlyPool.maxSize = Integer.valueOf(getProperty(pr, READONLY_PREFIX + POOL_MAX_SIZE));
        ldapConfig.readOnlyPool.policy = PoolPolicy.valueOf(getProperty(pr, READONLY_PREFIX + POOL_POLICY));
        ldapConfig.readOnlyPool.maxWait = Long.valueOf(getProperty(pr, READONLY_PREFIX + MAX_WAIT));
        ldapConfig.readOnlyPool.createIfNeeded = Boolean.valueOf(getProperty(pr, READONLY_PREFIX + CREATE_IF_NEEDED));

        ldapConfig.readWritePool.servers = getMultiProperty(pr, READWRITE_PREFIX + POOL_SERVERS);
        ldapConfig.readWritePool.initSize = Integer.valueOf(getProperty(pr, READWRITE_PREFIX + POOL_INIT_SIZE));
        ldapConfig.readWritePool.maxSize = Integer.valueOf(getProperty(pr, READWRITE_PREFIX + POOL_MAX_SIZE));
        ldapConfig.readWritePool.policy = PoolPolicy.valueOf(getProperty(pr, READWRITE_PREFIX + POOL_POLICY));
        ldapConfig.readWritePool.maxWait = Long.valueOf(getProperty(pr, READONLY_PREFIX + MAX_WAIT));
        ldapConfig.readWritePool.createIfNeeded = Boolean.valueOf(getProperty(pr, READONLY_PREFIX + CREATE_IF_NEEDED));
        loadPoolConfig(ldapConfig.readOnlyPool, pr, READONLY_PREFIX);
        loadPoolConfig(ldapConfig.readWritePool, pr, READWRITE_PREFIX);
        loadPoolConfig(ldapConfig.unboundReadOnlyPool, pr, UB_READONLY_PREFIX);

        ldapConfig.dbrcHost = getProperty(pr, LDAP_DBRC_ENTRY);
        ldapConfig.port = Integer.valueOf(getProperty(pr, LDAP_PORT));
@@ -265,6 +273,16 @@ public class LdapConfig
        return ldapConfig;
    }

    private static void loadPoolConfig(LdapPool pool, PropertiesReader pr, String prefix)
    {
        pool.servers = getMultiProperty(pr, prefix + POOL_SERVERS);
        pool.initSize = Integer.valueOf(getProperty(pr, prefix + POOL_INIT_SIZE));
        pool.maxSize = Integer.valueOf(getProperty(pr, prefix + POOL_MAX_SIZE));
        pool.policy = PoolPolicy.valueOf(getProperty(pr, prefix + POOL_POLICY));
        pool.maxWait = Long.valueOf(getProperty(pr, prefix + MAX_WAIT));
        pool.createIfNeeded = Boolean.valueOf(getProperty(pr, prefix + CREATE_IF_NEEDED));
    }

    private static String getProperty(PropertiesReader properties, String key)
    {
        String prop = properties.getFirstPropertyValue(key);
@@ -321,6 +339,9 @@ public class LdapConfig
        if ( !(l.readWritePool.equals(readWritePool)))
            return false;

        if ( !(l.unboundReadOnlyPool.equals(unboundReadOnlyPool)))
            return false;

        return true;
    }

@@ -338,6 +359,11 @@ public class LdapConfig
        return readWritePool;
    }

    public LdapPool getUnboundReadOnlyPool()
    {
        return unboundReadOnlyPool;
    }

    public String getUsersDN()
    {
        return this.usersDN;
@@ -386,12 +412,13 @@ public class LdapConfig
    public String toString()
    {
        StringBuilder sb = new StringBuilder();
        sb.append("ldap dbrc host = ");
        sb.append(dbrcHost);
        sb.append(" port = ");
        sb.append(port);
        sb.append(" proxyUserDN = ");
        sb.append(proxyUserDN);
        sb.append(" ReadOnlyPool: [" + readOnlyPool + "]");
        sb.append(" ReadWritePool: [" + readWritePool + "]");
        sb.append(" UnboundReadOnlyPool: [" + unboundReadOnlyPool + "]");
        sb.append(" Port: " + port);
        sb.append(" dbrcHost: " + dbrcHost);
        sb.append(" proxyUserDN: " + proxyUserDN);

        return sb.toString();
    }
}
+45 −131
Original line number Diff line number Diff line
@@ -81,7 +81,6 @@ import com.unboundid.ldap.sdk.LDAPConnection;
import com.unboundid.ldap.sdk.LDAPConnectionOptions;
import com.unboundid.ldap.sdk.LDAPConnectionPool;
import com.unboundid.ldap.sdk.LDAPException;
import com.unboundid.ldap.sdk.LDAPReadWriteConnectionPool;
import com.unboundid.ldap.sdk.RoundRobinServerSet;
import com.unboundid.ldap.sdk.ServerSet;
import com.unboundid.ldap.sdk.SimpleBindRequest;
@@ -98,51 +97,50 @@ public class LdapConnectionPool
{
    private static final Logger logger = Logger.getLogger(LdapConnectionPool.class);

    private static final int POOL_CHECK_INTERVAL_MILLESCONDS = 10000; // 10 seconds

    Profiler profiler = new Profiler(LdapConnectionPool.class);

    protected LdapConfig currentConfig;
    private LDAPReadWriteConnectionPool pool;
    private String poolName;
    private LDAPConnectionPool pool;
    private Object poolMonitor = new Object();
    private LDAPConnectionOptions connectionOptions;

    private long lastPoolCheck = System.currentTimeMillis();

    public LdapConnectionPool()
    {
        this(LdapConfig.getLdapConfig());
    }

    public LdapConnectionPool(LdapConfig config)
    public LdapConnectionPool(LdapConfig config, LdapPool poolConfig, String poolName, boolean boundPool)
    {
        if (config == null)
            throw new IllegalArgumentException("config required");
        if (poolConfig == null)
            throw new IllegalArgumentException("poolConfig required");
        if (poolName == null)
            throw new IllegalArgumentException("poolName required");

        connectionOptions = new LDAPConnectionOptions();
        connectionOptions.setUseSynchronousMode(true);
        connectionOptions.setAutoReconnect(true);
        currentConfig = config;
        this.poolName = poolName;
        synchronized (poolMonitor)
        {
            pool = createPool(currentConfig);
            profiler.checkpoint("Create pool");
            if (!boundPool)
                pool = createPool(config, poolConfig, poolName, null, null);
            else
                pool = createPool(config, poolConfig, poolName, config.getAdminUserDN(), config.getAdminPasswd());
            logger.debug(poolName + " statistics after create:\n" + pool.getConnectionPoolStatistics());
            profiler.checkpoint("Create read only pool.");
        }
    }

    public LDAPConnection getReadOnlyConnection() throws TransientException
    public LDAPConnection getConnection() throws TransientException
    {
        poolCheck();

        try
        {
            LDAPConnection conn = null;
            synchronized (poolMonitor)
            {
                conn = pool.getReadConnection();
                conn = pool.getConnection();
            }
            logger.debug("Read pool statistics after borrow:\n" + pool.getReadPoolStatistics());
            profiler.checkpoint("get read only connection");
            logger.debug(poolName + " pool statistics after borrow:\n" + pool.getConnectionPoolStatistics());
            profiler.checkpoint("get " + poolName + " only connection");
            conn.setConnectionOptions(connectionOptions);

            return conn;
@@ -153,40 +151,10 @@ public class LdapConnectionPool
        }
    }

    public LDAPConnection getReadWriteConnection() throws TransientException
    {
        poolCheck();

        try
        {
            LDAPConnection conn = null;
            synchronized (poolMonitor)
    public void releaseConnection(LDAPConnection conn)
    {
                conn = pool.getWriteConnection();
            }

            logger.debug("write pool statistics after borrow:\n" + pool.getWritePoolStatistics());
            profiler.checkpoint("get read write connection");
            conn.setConnectionOptions(connectionOptions);

            return conn;
        }
        catch (LDAPException e)
        {
            throw new TransientException("Failed to get read write connection", e);
        }
    }

    public void releaseReadOnlyConnection(LDAPConnection conn)
    {
        pool.releaseReadConnection(conn);
        logger.debug("Read pool statistics after release:\n" + pool.getReadPoolStatistics());
    }

    public void releaseReadWriteConnection(LDAPConnection conn)
    {
        pool.releaseWriteConnection(conn);
        logger.debug("write pool statistics after release:\n" + pool.getWritePoolStatistics());
        pool.releaseConnection(conn);
        logger.debug(poolName + " pool statistics after release:\n" + pool.getConnectionPoolStatistics());
    }

    public LdapConfig getCurrentConfig()
@@ -196,9 +164,9 @@ public class LdapConnectionPool

    public void shutdown()
    {
        logger.debug("Shutting down pool");
        logger.debug("Closing pool...");
        pool.close();
        profiler.checkpoint("Shutdown pool");
        profiler.checkpoint("Pool closed.");
    }

    @Override
@@ -209,104 +177,50 @@ public class LdapConnectionPool
            pool.close();
    }

    private void poolCheck()
    {
        if (timeToCheckPool())
        {
            // check to see if the configuration has changed
            logger.debug("checking for ldap config change");
            LdapConfig newConfig = LdapConfig.getLdapConfig();
            if (!newConfig.equals(currentConfig))
            {
                logger.debug("Detected ldap configuration change, rebuilding pools");
                boolean poolRecreated = false;
                final LDAPReadWriteConnectionPool oldPool = pool;
    private LDAPConnectionPool createPool(LdapConfig config, LdapPool poolConfig, String poolName, String bindID, String bindPW)

                synchronized (poolMonitor)
                {
                    // check to see if another thread has already
                    // done the work
                    if (timeToCheckPool())
                    {
                        this.currentConfig = newConfig;
                        pool = createPool(currentConfig);
                        profiler.checkpoint("Rebuild pool");
                        lastPoolCheck = System.currentTimeMillis();
                        poolRecreated = true;
                    }
                }

                if (poolRecreated)
                {
                    // close the old pool in a separate thread
                    Runnable closeOldPool = new Runnable()
                    {
                        public void run()
                        {
                            logger.debug("Closing old pool...");
                            oldPool.close();
                            logger.debug("Old pool closed.");
                        }
                    };
                    Thread closePoolThread = new Thread(closeOldPool);
                    closePoolThread.start();
                }

            }
            else
            {
                lastPoolCheck = System.currentTimeMillis();
            }
        }
    }

    private boolean timeToCheckPool()
    {
        return (System.currentTimeMillis() - lastPoolCheck) > POOL_CHECK_INTERVAL_MILLESCONDS;
    }

    private LDAPReadWriteConnectionPool createPool(LdapConfig config)
    {
        LDAPConnectionPool ro = createPool(config.getReadOnlyPool(), config);
        LDAPConnectionPool rw = createPool(config.getReadOnlyPool(), config);
        LDAPReadWriteConnectionPool pool = new LDAPReadWriteConnectionPool(ro, rw);
        logger.debug("Read pool statistics after create:\n" + pool.getReadPoolStatistics());
        logger.debug("Write pool statistics after create:\n" + pool.getWritePoolStatistics());
        return pool;
    }

    private synchronized LDAPConnectionPool createPool(LdapPool pool, LdapConfig config)
    {
        try
        {
            logger.debug("LDAP Config: " + config);
            String[] hosts = pool.getServers().toArray(new String[0]);
            int[] ports = new int[pool.getServers().size()];
            for (int i=0; i<pool.getServers().size(); i++)
            String[] hosts = poolConfig.getServers().toArray(new String[0]);
            int[] ports = new int[poolConfig.getServers().size()];
            for (int i=0; i<poolConfig.getServers().size(); i++)
            {
                ports[i] = config.getPort();
            }

            ServerSet serverSet = null;
            if (pool.getPolicy().equals(PoolPolicy.roundRobin))
            if (poolConfig.getPolicy().equals(PoolPolicy.roundRobin))
            {
                serverSet = new RoundRobinServerSet(hosts, ports, LdapDAO.getSocketFactory(config));
            }
            else if (pool.getPolicy().equals(PoolPolicy.fewestConnections))
            else if (poolConfig.getPolicy().equals(PoolPolicy.fewestConnections))
            {
                serverSet = new FewestConnectionsServerSet(hosts, ports, LdapDAO.getSocketFactory(config));
            }
            else
            {
                throw new IllegalStateException("Unconfigured pool policy: " + pool.getPolicy());
                throw new IllegalStateException("Unconfigured pool policy: " + poolConfig.getPolicy());
            }

            SimpleBindRequest bindRequest = new SimpleBindRequest(config.getAdminUserDN(), config.getAdminPasswd());
            SimpleBindRequest bindRequest = null;
            if (bindID != null && bindPW != null)
            {
                logger.debug("Binding pool as " + bindID);
                bindRequest = new SimpleBindRequest(bindID, bindPW);
            }
            else
            {
                logger.debug("Binding pool annonymously");
                bindRequest = new SimpleBindRequest();
            }
            LDAPConnectionPool connectionPool = new LDAPConnectionPool(
                serverSet, bindRequest, pool.getInitSize(), pool.getMaxSize());
                serverSet, bindRequest, poolConfig.getInitSize(), poolConfig.getMaxSize());

            connectionPool.setCreateIfNecessary(pool.getCreateIfNeeded());
            connectionPool.setMaxWaitTimeMillis(pool.getMaxWait());
            connectionPool.setCreateIfNecessary(poolConfig.getCreateIfNeeded());
            connectionPool.setMaxWaitTimeMillis(poolConfig.getMaxWait());
            connectionPool.setConnectionPoolName(poolName);

            return connectionPool;
        }
+75 −35

File changed.

Preview size limit exceeded, changes collapsed.

Loading