/* * ResourceWordStream.java * * Copyright (C) 2005 Crutcher Dunnavant * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ import java.util.*; import java.io.*; import java.lang.*; final class ResourceWordStream implements WordStream { private String title; private String resourceId; /** * Locks entry through the WordStream interface. */ private SpinLock wordStreamLock = new SpinLock(); /** * Locks the WordStreamWatcher set by setWatcher(). */ private SpinLock wordStreamWatcherLock = new SpinLock(); /** * Locks the preFetch thread. */ private SpinLock preFetchLock = new SpinLock(); /** * Locks the findChunks thread. */ private SpinLock findChunksLock = new SpinLock(); private boolean haltFindChunks = false; /** * Locks entry to loadChunk. */ private SpinLock loadChunkLock = new SpinLock(); /** * Locks entry to ChunkCache actions. */ private SpinLock chunkCacheLock = new SpinLock(); /** * Stats. */ int numWords, numChunks, resourceSize; /* ============================================================= */ public ResourceWordStream (String title, String resourceId) { /* Open the initial stream. */ setStream(title, resourceId); } public void setStream (String title, String resourceId) { /* Lock out WordStream users. */ wordStreamLock.lock(); /* Lock out preFetch. */ preFetchLock.lock(); /* Halt and Release findChunks */ haltFindChunks = true; findChunksLock.lock(); findChunksLock.unlock(); haltFindChunks = false; /* Note: * At this point, no thread is proceding in findChunks() or * loadChunk() (though some may be exiting); and all new * getWordStream{Word,Title,Size}() requests will stall * until wordStreamLock is unlocked. */ /* Set the stream info. */ this.title = title; this.resourceId = resourceId; /* Zero the stats. */ numWords = 0; resourceSize = 0; numChunks = 0; /* Zero the chunk buffer. */ chunkCache = new Chunk[MAXCHUNKS]; /* Zero the chunk offset map. */ id2pos = new int[0]; /* Zero the character buffer. */ cbufStart = -1; cbufEnd = -1; /* Force a GC run. */ System.gc(); /* Start findChunks */ Thread t = new Thread() { public void run() { findChunks(); } }; t.start(); /* Release preFetch */ preFetchLock.unlock(); /* Release the stream to WordStream users. */ wordStreamLock.unlock(); /* Notifiy the WordStreamWatcher, if there is one. */ wordStreamWatcherLock.lock(); if (watcher != null) { watcher.streamChange(true); } wordStreamWatcherLock.unlock(); } /* ============================================================= */ /* ============================================================= */ /* WordStream interface functions. */ WordStreamWatcher watcher; public void setWatcher (WordStreamWatcher watcher) { wordStreamWatcherLock.lock(); this.watcher = watcher; wordStreamWatcherLock.unlock(); } public String getWordStreamTitle() { String ret; wordStreamLock.lock(); ret = title; wordStreamLock.unlock(); return ret; } public int getWordStreamSize() { int ret; wordStreamLock.lock(); ret = numWords; wordStreamLock.unlock(); return ret; } private int preFetchId = -1; public String getWordStreamWord (int index) { wordStreamLock.lock(); /* Bounds check. */ if ((index < 0) || (index > numWords)) { /* FIXME: Should this raise an exception? */ wordStreamLock.unlock(); return null; } int chunkID = index / CSIZE; int chunkIndex = index - chunkID * CSIZE; String word = getChunk(chunkID).warray[chunkIndex]; if (chunkIndex >= CSIZE - PFIDX) { int nextChunkId = chunkID + 1; if (nextChunkId < numChunks) { if (nextChunkId != preFetchId) { preFetchId = nextChunkId; Thread t = new Thread() { public void run() { preFetchLock.lock(); getChunk(preFetchId); preFetchLock.unlock(); } }; t.start(); } } } wordStreamLock.unlock(); return word; } /* ============================================================= */ /* ============================================================= */ private int[] id2pos; private void findChunks() { findChunksLock.lock(); InputStreamReader reader = new InputStreamReader (getClass().getResourceAsStream(resourceId)); int nextPos = 0; boolean inWord = false; boolean addWord = false; int wcount = 0; int wordmark = 0; int nlcount = 0; int i; int[] tmp; int v; char c; int chunkcount = 1; id2pos = new int[chunkcount]; id2pos[0] = 0; while (true) { try { v = reader.read(); nextPos ++; } catch (IOException e) { v = -1; } if (v == -1) { break; } if (haltFindChunks) { /* We've been told halt. */ findChunksLock.unlock(); return; } c = (char)v; if (inWord) { switch (c) { case '\n': case ' ': case '\r': case '\t': /* In a word, seeing ws. */ /* Reset nlcount. */ nlcount = (c == '\n') ? 1 : 0; /* Store the word. */ wordmark = nextPos - 1; /* Change state. */ inWord = false; break; default: /* In a word, seeing more chars. */ /* (NOOP) */ break; } } else { /* in whitespace. */ switch (c) { case '\n': nlcount ++; if (nlcount == 2) { /* Gen a blank word for * multiline runs of ws. */ addWord = true; } case ' ': case '\r': case '\t': /* In ws, seeing ws. */ /* (NOOP) */ break; default: /* In ws, seeing a word. */ /* Mark the current word. */ wordmark = nextPos - 1; addWord = true; /* Change state. */ inWord = true; break; } } if (addWord) { wcount ++; if ((wcount>1) && ((wcount%CSIZE)==1)) { //System.err.println("wcount: " + wcount); chunkcount ++; tmp = new int[chunkcount]; for (i = chunkcount - 2; i >= 0; i--) { tmp[i] = id2pos[i]; } tmp[chunkcount - 1] = wordmark; id2pos = tmp; /** * We are making available the * previous chunk, but not the * current chunk; assignment * order matters here. * * The begining of the first word * in this chunk _IS_ the length of * the stream up to this point. */ resourceSize = wordmark; numChunks = chunkcount - 1; numWords = numChunks * CSIZE; wordStreamWatcherLock.lock(); if (watcher != null) { watcher.streamChange(false); } wordStreamWatcherLock.unlock(); } addWord = false; } /* Yield every now and then. */ if ((wcount % 10) == 9) { Thread.currentThread().yield(); } } try { reader.close(); } catch (IOException e) { // noop. } reader = null; /** * Again, order matters here. */ resourceSize = nextPos; numChunks = chunkcount; numWords = wcount; wordStreamWatcherLock.lock(); if (watcher != null) { watcher.streamChange(false); } wordStreamWatcherLock.unlock(); findChunksLock.unlock(); } /* ============================================================= */ /* ============================================================= */ /* Character Buffer */ private final int CBUFSIZE = 64; private char[] cbuf = new char[CBUFSIZE]; /* cbufStart is the resource index of the first char in cbuf. * cbufEnd is the resource index of the last valid char in cbuf. */ private int cbufStart, cbufEnd; private InputStreamReader isr; private int isrNextCharIndex; private char getChar (int index) { if ((index < cbufStart) || (index > cbufEnd)) { loadBuffer(index); } return cbuf[index - cbufStart]; } private void loadBuffer (int index) { /* What buffer ends will bracket this char request? */ cbufStart = (index / CBUFSIZE) * CBUFSIZE; cbufEnd = cbufStart + CBUFSIZE - 1; if (cbufEnd >= resourceSize) { /* Don't overflow the resource. */ cbufEnd = resourceSize - 1; } /* Reopen the stream, if necessary. */ if ((isr == null) || (cbufStart < isrNextCharIndex)) { if (isr != null) { try { isr.close(); } catch (IOException e) { // noop. } isr = null; } isr = new InputStreamReader (getClass().getResourceAsStream(resourceId)); isrNextCharIndex = 0; } try { /* Skip to the read location. */ isr.skip(cbufStart - isrNextCharIndex); /* Read into the buffer. */ /* Note: read should read AT LEAST one byte. */ int len = isr.read(cbuf, 0, cbufEnd - cbufStart + 1); /* cbufEnd was what we wanted, here we make it what * we got. */ cbufEnd = cbufStart + len - 1; isrNextCharIndex = cbufEnd + 1; } catch (IOException e) { // FIXME: Should do something smarter with this. // System.err.println("dead in loadBuffer"); } } /* ============================================================= */ /* ============================================================= */ private final int CSIZE = 200; private final int PFIDX = 50; private final int MAXCHUNKS = 5; private class Chunk { public int id; public String[] warray; public Chunk() {}; } private Chunk[] chunkCache; private Chunk getChunk (int id) { Chunk c; c = chunkScan(id); if (c != null) { return c; } else { return loadChunk(id); } } private Chunk chunkScan (int id) { chunkCacheLock.lock(); Chunk c = null; int j = -1; /** Is the chunk already present? */ for (int i = 0; i < MAXCHUNKS; i++) { c = chunkCache[i]; if ((c != null) && (c.id == id)) { j = i; break; } } if (j == -1) { /* Chunk not present. */ c = null; } else if (j > 0) { /* Shift the found chunk to the top. */ for (; j > 0; j--) { chunkCache[j] = chunkCache[j-1]; } chunkCache[0] = c; } chunkCacheLock.unlock(); return c; } private void insertChunk (Chunk chunk) { chunkCacheLock.lock(); /* Shift the store, put the new chunk on top. */ for (int i = MAXCHUNKS - 1; i > 0; i--) { chunkCache[i] = chunkCache[i-1]; } chunkCache[0] = chunk; chunkCacheLock.unlock(); } private static final String emptyStr = ""; private Chunk loadChunk (int id) { Chunk chunk; loadChunkLock.lock(); if ((chunk = chunkScan(id)) != null) { loadChunkLock.unlock(); return chunk; } String[] warray = new String[CSIZE]; boolean inWord = false; int wcount = 0; int nlcount = 0; StringBuffer wbuf = null; int nextPos = id2pos[id]; int endPos = (id + 1 < numChunks) ? id2pos[id + 1] : resourceSize; while ((nextPos < endPos) && (wcount < CSIZE)) { char c = getChar(nextPos); nextPos ++; if (inWord) { switch (c) { case '\n': case ' ': case '\r': case '\t': /* In a word, seeing ws. */ /* Reset nlcount. */ nlcount = (c == '\n') ? 1 : 0; /* Store the word. */ warray[wcount] = wbuf.toString(); wcount ++; wbuf = null; /* Change state. */ inWord = false; break; default: /* In a word, seeing more chars. */ wbuf.append(c); break; } } else { /* in whitespace. */ switch (c) { case '\n': nlcount ++; if (nlcount == 2) { /* Gen a blank word for * multiline runs of ws. */ warray[wcount] = emptyStr; wcount ++; } case ' ': case '\r': case '\t': /* In ws, seeing ws. */ /* (NOOP) */ break; default: /* In ws, seeing a word. */ /* Start a new word word. */ wbuf = new StringBuffer(); wbuf.append(c); /* Change state. */ inWord = true; break; } } /* Yield every now and then. */ if ((wcount % 10) == 9) { Thread.currentThread().yield(); } } /* If we hit the end of the stream, it is possible for wbuf * to have a word in it, which we need to add. */ if (wbuf != null) { warray[wcount] = wbuf.toString(); wcount ++; } chunk = new Chunk(); chunk.id = id; chunk.warray = warray; insertChunk(chunk); loadChunkLock.unlock(); return chunk; } }